package org.apache.flink.runtime.state.v2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.v2.internal.InternalReducingState;

/* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractReducingState.class */
public class AbstractReducingState<K, N, V> extends AbstractKeyedState<K, N, V> implements InternalReducingState<K, N, V> {
    protected final ReduceFunction<V> reduceFunction;

    public AbstractReducingState(StateRequestHandler stateRequestHandler, ReducingStateDescriptor<V> reducingStateDescriptor) {
        super(stateRequestHandler, reducingStateDescriptor);
        this.reduceFunction = reducingStateDescriptor.getReduceFunction();
    }

    @Override // org.apache.flink.api.common.state.v2.AppendingState
    public StateFuture<V> asyncGet() {
        return asyncGetInternal();
    }

    @Override // org.apache.flink.api.common.state.v2.AppendingState
    public StateFuture<Void> asyncAdd(V v) {
        return asyncGetInternal().thenCompose(obj -> {
            return asyncUpdateInternal(obj == null ? (V) v : this.reduceFunction.reduce(obj, v));
        });
    }

    @Override // org.apache.flink.api.common.state.v2.AppendingState
    public V get() {
        return getInternal();
    }

    @Override // org.apache.flink.api.common.state.v2.AppendingState
    public void add(V v) {
        V reduce;
        V internal = getInternal();
        if (internal == null) {
            reduce = v;
        } else {
            try {
                reduce = this.reduceFunction.reduce(internal, v);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        updateInternal(reduce);
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalMergingState
    public StateFuture<Void> asyncMergeNamespaces(N n, Collection<N> collection) {
        if (collection == null || collection.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        ArrayList arrayList = new ArrayList(collection.size() + 1);
        for (N n2 : collection) {
            if (n2 != null) {
                setCurrentNamespace(n2);
                arrayList.add(asyncGetInternal());
            }
        }
        setCurrentNamespace(n);
        arrayList.add(asyncGetInternal());
        return StateFutureUtils.combineAll(arrayList).thenCompose(collection2 -> {
            ArrayList arrayList2 = new ArrayList(collection.size() + 1);
            Object obj = null;
            Iterator it = collection2.iterator();
            for (Object obj2 : collection) {
                Object next = it.next();
                if (next != null) {
                    setCurrentNamespace(obj2);
                    arrayList2.add(asyncUpdateInternal(null));
                    obj = obj != null ? this.reduceFunction.reduce(obj, next) : next;
                }
            }
            Object next2 = it.next();
            if (obj != null) {
                if (next2 != null) {
                    obj = this.reduceFunction.reduce(obj, next2);
                }
                setCurrentNamespace(n);
                arrayList2.add(asyncUpdateInternal(obj));
            }
            return StateFutureUtils.combineAll(arrayList2).thenAccept(collection2 -> {
            });
        });
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalMergingState
    public void mergeNamespaces(N n, Collection<N> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        try {
            V v = null;
            for (N n2 : collection) {
                if (n2 != null) {
                    setCurrentNamespace(n2);
                    V internal = getInternal();
                    if (internal != null) {
                        updateInternal(null);
                        v = v != null ? this.reduceFunction.reduce(v, internal) : internal;
                    }
                }
            }
            if (v != null) {
                setCurrentNamespace(n);
                V internal2 = getInternal();
                if (internal2 != null) {
                    v = this.reduceFunction.reduce(v, internal2);
                }
                updateInternal(v);
            }
        } catch (Exception e) {
            throw new RuntimeException("merge namespace fail.", e);
        }
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public StateFuture<V> asyncGetInternal() {
        return (StateFuture<V>) handleRequest(StateRequestType.REDUCING_GET, null);
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public StateFuture<Void> asyncUpdateInternal(V v) {
        return handleRequest(StateRequestType.REDUCING_ADD, v);
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public V getInternal() {
        return (V) handleRequestSync(StateRequestType.REDUCING_GET, null);
    }

    @Override // org.apache.flink.runtime.state.v2.internal.InternalStateAccessible
    public void updateInternal(V v) {
        handleRequestSync(StateRequestType.REDUCING_ADD, v);
    }
}
