package org.apache.flink.runtime.state.v2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;

/* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractAggregatingState.class */
public class AbstractAggregatingState<K, N, IN, ACC, OUT> extends AbstractKeyedState<K, N, ACC> implements InternalAggregatingState<K, N, IN, ACC, OUT> {
    protected final AggregateFunction<IN, ACC, OUT> aggregateFunction;

    public AbstractAggregatingState(StateRequestHandler stateRequestHandler, AggregatingStateDescriptor<IN, ACC, OUT> aggregatingStateDescriptor) {
        super(stateRequestHandler, aggregatingStateDescriptor);
        this.aggregateFunction = aggregatingStateDescriptor.getAggregateFunction();
    }

    public StateFuture<OUT> asyncGet() {
        return asyncGetInternal().thenApply(obj -> {
            if (obj == null) {
                return null;
            }
            return this.aggregateFunction.getResult(obj);
        });
    }

    public StateFuture<Void> asyncAdd(IN in) {
        return asyncGetInternal().thenCompose(obj -> {
            return asyncUpdateInternal(this.aggregateFunction.add(in, obj == null ? this.aggregateFunction.createAccumulator() : obj));
        });
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public StateFuture<ACC> asyncGetInternal() {
        return handleRequest(StateRequestType.AGGREGATING_GET, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public StateFuture<Void> asyncUpdateInternal(ACC acc) {
        return handleRequest(StateRequestType.AGGREGATING_ADD, acc);
    }

    public OUT get() {
        ACC internal = getInternal();
        if (internal == null) {
            return null;
        }
        return (OUT) this.aggregateFunction.getResult(internal);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void add(IN in) {
        ACC internal = getInternal();
        try {
            updateInternal(internal == null ? this.aggregateFunction.add(in, this.aggregateFunction.createAccumulator()) : this.aggregateFunction.add(in, internal));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalMergingState
    public StateFuture<Void> asyncMergeNamespaces(N n, Collection<N> collection) {
        if (collection == null || collection.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        ArrayList arrayList = new ArrayList(collection.size() + 1);
        for (N n2 : collection) {
            if (n2 != null) {
                setCurrentNamespace(n2);
                arrayList.add(asyncGetInternal());
            }
        }
        setCurrentNamespace(n);
        arrayList.add(asyncGetInternal());
        return StateFutureUtils.combineAll(arrayList).thenCompose(collection2 -> {
            ArrayList arrayList2 = new ArrayList(collection.size() + 1);
            Object obj = null;
            Iterator it = collection2.iterator();
            for (Object obj2 : collection) {
                Object next = it.next();
                if (next != null) {
                    setCurrentNamespace(obj2);
                    arrayList2.add(asyncUpdateInternal(null));
                    obj = obj == null ? next : this.aggregateFunction.merge(obj, next);
                }
            }
            Object next2 = it.next();
            if (obj != null) {
                if (next2 != null) {
                    obj = this.aggregateFunction.merge(obj, next2);
                }
                setCurrentNamespace(n);
                arrayList2.add(asyncUpdateInternal(obj));
            }
            return StateFutureUtils.combineAll(arrayList2).thenAccept(collection2 -> {
            });
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.v2.internal.InternalMergingState
    public void mergeNamespaces(N n, Collection<N> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        try {
            OUT out = null;
            for (N n2 : collection) {
                if (n2 != null) {
                    setCurrentNamespace(n2);
                    OUT handleRequestSync = handleRequestSync(StateRequestType.AGGREGATING_GET, null);
                    if (handleRequestSync != null) {
                        handleRequestSync(StateRequestType.AGGREGATING_ADD, null);
                        out = out != null ? this.aggregateFunction.merge(out, handleRequestSync) : handleRequestSync;
                    }
                }
            }
            if (out != null) {
                setCurrentNamespace(n);
                OUT handleRequestSync2 = handleRequestSync(StateRequestType.AGGREGATING_GET, null);
                if (handleRequestSync2 != null) {
                    out = this.aggregateFunction.merge(out, handleRequestSync2);
                }
                handleRequestSync(StateRequestType.AGGREGATING_ADD, out);
            }
        } catch (Exception e) {
            throw new RuntimeException("merge namespace fail.", e);
        }
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public ACC getInternal() {
        return handleRequestSync(StateRequestType.AGGREGATING_GET, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public void updateInternal(ACC acc) {
        handleRequestSync(StateRequestType.AGGREGATING_ADD, acc);
    }
}
