package org.apache.flink.runtime.state.heap;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.IterableStateSnapshot;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/StateTable.class */
public abstract class StateTable<K, N, S> implements StateSnapshotRestore, Iterable<StateEntry<K, N, S>> {
    protected final InternalKeyContext<K> keyContext;
    protected RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo;
    protected final TypeSerializer<K> keySerializer;
    protected final KeyGroupRange keyGroupRange;
    protected final StateMap<K, N, S>[] keyGroupedStateMaps;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/StateTable$StateEntryIterator.class */
    class StateEntryIterator implements InternalKvState.StateIncrementalVisitor<K, N, S> {
        final int recommendedMaxNumberOfReturnedRecords;
        int keyGroupIndex = 0;
        InternalKvState.StateIncrementalVisitor<K, N, S> stateIncrementalVisitor;

        StateEntryIterator(int i) {
            this.recommendedMaxNumberOfReturnedRecords = i;
            next();
        }

        private void next() {
            while (this.keyGroupIndex < StateTable.this.keyGroupedStateMaps.length) {
                StateMap<K, N, S>[] stateMapArr = StateTable.this.keyGroupedStateMaps;
                int i = this.keyGroupIndex;
                this.keyGroupIndex = i + 1;
                InternalKvState.StateIncrementalVisitor<K, N, S> stateIncrementalVisitor = stateMapArr[i].getStateIncrementalVisitor(this.recommendedMaxNumberOfReturnedRecords);
                if (stateIncrementalVisitor.hasNext()) {
                    this.stateIncrementalVisitor = stateIncrementalVisitor;
                    return;
                }
            }
        }

        @Override // org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor
        public boolean hasNext() {
            InternalKvState.StateIncrementalVisitor<K, N, S> stateIncrementalVisitor;
            do {
                if (this.stateIncrementalVisitor != null && this.stateIncrementalVisitor.hasNext()) {
                    return true;
                }
                if (this.keyGroupIndex == StateTable.this.keyGroupedStateMaps.length) {
                    return false;
                }
                StateMap<K, N, S>[] stateMapArr = StateTable.this.keyGroupedStateMaps;
                int i = this.keyGroupIndex;
                this.keyGroupIndex = i + 1;
                stateIncrementalVisitor = stateMapArr[i].getStateIncrementalVisitor(this.recommendedMaxNumberOfReturnedRecords);
            } while (!stateIncrementalVisitor.hasNext());
            this.stateIncrementalVisitor = stateIncrementalVisitor;
            return true;
        }

        @Override // org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor
        public Collection<StateEntry<K, N, S>> nextEntries() {
            if (hasNext()) {
                return this.stateIncrementalVisitor.nextEntries();
            }
            return null;
        }

        @Override // org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor
        public void remove(StateEntry<K, N, S> stateEntry) {
            StateTable.this.keyGroupedStateMaps[this.keyGroupIndex - 1].remove(stateEntry.getKey(), stateEntry.getNamespace());
        }

        @Override // org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor
        public void update(StateEntry<K, N, S> stateEntry, S s) {
            StateTable.this.keyGroupedStateMaps[this.keyGroupIndex - 1].put(stateEntry.getKey(), stateEntry.getNamespace(), s);
        }
    }

    public StateTable(InternalKeyContext<K> internalKeyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> registeredKeyValueStateBackendMetaInfo, TypeSerializer<K> typeSerializer) {
        this.keyContext = (InternalKeyContext) Preconditions.checkNotNull(internalKeyContext);
        this.metaInfo = (RegisteredKeyValueStateBackendMetaInfo) Preconditions.checkNotNull(registeredKeyValueStateBackendMetaInfo);
        this.keySerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.keyGroupRange = internalKeyContext.getKeyGroupRange();
        this.keyGroupedStateMaps = new StateMap[internalKeyContext.getKeyGroupRange().getNumberOfKeyGroups()];
        for (int i = 0; i < this.keyGroupedStateMaps.length; i++) {
            this.keyGroupedStateMaps[i] = createStateMap();
        }
    }

    protected abstract StateMap<K, N, S> createStateMap();

    @Override // org.apache.flink.runtime.state.StateSnapshotRestore
    @Nonnull
    public abstract IterableStateSnapshot<K, N, S> stateSnapshot();

    public boolean isEmpty() {
        return size() == 0;
    }

    public int size() {
        int i = 0;
        for (StateMap<K, N, S> stateMap : this.keyGroupedStateMaps) {
            i += stateMap.size();
        }
        return i;
    }

    public S get(N n) {
        return get(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), n);
    }

    public boolean containsKey(N n) {
        return containsKey(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), n);
    }

    public void put(N n, S s) {
        put(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), n, s);
    }

    public void remove(N n) {
        remove(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), n);
    }

    public S removeAndGetOld(N n) {
        return removeAndGetOld(this.keyContext.getCurrentKey(), this.keyContext.getCurrentKeyGroupIndex(), n);
    }

    public <T> void transform(N n, T t, StateTransformationFunction<S, T> stateTransformationFunction) throws Exception {
        K currentKey = this.keyContext.getCurrentKey();
        checkKeyNamespacePreconditions(currentKey, n);
        getMapForKeyGroup(this.keyContext.getCurrentKeyGroupIndex()).transform(currentKey, n, t, stateTransformationFunction);
    }

    public S get(K k, N n) {
        return get(k, KeyGroupRangeAssignment.assignToKeyGroup(k, this.keyContext.getNumberOfKeyGroups()), n);
    }

    public Stream<K> getKeys(N n) {
        return Arrays.stream(this.keyGroupedStateMaps).flatMap(stateMap -> {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0), false);
        }).filter(stateEntry -> {
            return stateEntry.getNamespace().equals(n);
        }).map((v0) -> {
            return v0.getKey();
        });
    }

    public Stream<Tuple2<K, N>> getKeysAndNamespaces() {
        return Arrays.stream(this.keyGroupedStateMaps).flatMap(stateMap -> {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0), false);
        }).map(stateEntry -> {
            return Tuple2.of(stateEntry.getKey(), stateEntry.getNamespace());
        });
    }

    public InternalKvState.StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int i) {
        return new StateEntryIterator(i);
    }

    private S get(K k, int i, N n) {
        checkKeyNamespacePreconditions(k, n);
        return getMapForKeyGroup(i).get(k, n);
    }

    private boolean containsKey(K k, int i, N n) {
        checkKeyNamespacePreconditions(k, n);
        return getMapForKeyGroup(i).containsKey(k, n);
    }

    private void checkKeyNamespacePreconditions(K k, N n) {
        Preconditions.checkNotNull(k, "No key set. This method should not be called outside of a keyed context.");
        Preconditions.checkNotNull(n, "Provided namespace is null.");
    }

    private void remove(K k, int i, N n) {
        checkKeyNamespacePreconditions(k, n);
        getMapForKeyGroup(i).remove(k, n);
    }

    private S removeAndGetOld(K k, int i, N n) {
        checkKeyNamespacePreconditions(k, n);
        return getMapForKeyGroup(i).removeAndGetOld(k, n);
    }

    @VisibleForTesting
    public StateMap<K, N, S>[] getState() {
        return this.keyGroupedStateMaps;
    }

    public int getKeyGroupOffset() {
        return this.keyGroupRange.getStartKeyGroup();
    }

    @VisibleForTesting
    public StateMap<K, N, S> getMapForKeyGroup(int i) {
        int indexToOffset = indexToOffset(i);
        if (indexToOffset < 0 || indexToOffset >= this.keyGroupedStateMaps.length) {
            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(i, this.keyGroupRange);
        }
        return this.keyGroupedStateMaps[indexToOffset];
    }

    private int indexToOffset(int i) {
        return i - getKeyGroupOffset();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    public TypeSerializer<S> getStateSerializer() {
        return this.metaInfo.getStateSerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.metaInfo.getNamespaceSerializer();
    }

    public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
        return this.metaInfo;
    }

    public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> registeredKeyValueStateBackendMetaInfo) {
        this.metaInfo = registeredKeyValueStateBackendMetaInfo;
    }

    public void put(K k, int i, N n, S s) {
        checkKeyNamespacePreconditions(k, n);
        getMapForKeyGroup(i).put(k, n, s);
    }

    @Override // java.lang.Iterable
    public Iterator<StateEntry<K, N, S>> iterator() {
        return Arrays.stream(this.keyGroupedStateMaps).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap(stateMap -> {
            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0), false);
        }).iterator();
    }

    @VisibleForTesting
    public int sizeOfNamespace(Object obj) {
        int i = 0;
        for (StateMap<K, N, S> stateMap : this.keyGroupedStateMaps) {
            i += stateMap.sizeOfNamespace(obj);
        }
        return i;
    }

    @Override // org.apache.flink.runtime.state.StateSnapshotRestore
    @Nonnull
    public StateSnapshotKeyGroupReader keyGroupReader(int i) {
        return StateTableByKeyGroupReaders.readerForVersion(this, i);
    }
}
