package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.RunnableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/ForStKeyedStateBackend.class */
public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ForStKeyedStateBackend.class);
    private final ExecutionConfig executionConfig;
    private final int keyGroupPrefixBytes;
    private final KeyGroupRange keyGroupRange;
    protected final TtlTimeProvider ttlTimeProvider;
    protected final TypeSerializer<K> keySerializer;
    private final Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder;
    private final Supplier<DataOutputSerializer> valueSerializerView;
    private final Supplier<DataInputDeserializer> valueDeserializerView;
    private final UUID backendUID;
    private final ForStResourceContainer optionsContainer;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final ColumnFamilyHandle defaultColumnFamily;
    private final ForStSnapshotStrategyBase<K, ?> snapshotStrategy;
    private final PriorityQueueSetFactory priorityQueueFactory;
    private final HeapPriorityQueuesManager heapPriorityQueuesManager;
    private final CloseableRegistry cancelStreamRegistry;
    private final ForStNativeMetricMonitor nativeMetricMonitor;
    protected final RocksDB db;
    private StateRequestHandler stateRequestHandler;
    private final LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;
    private final ForStDBTtlCompactFiltersManager ttlCompactFiltersManager;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private boolean disposed = false;
    private final HashMap<String, InternalKeyedState<K, ?, ?>> keyValueStatesByName = new HashMap<>();

    @GuardedBy("lock")
    private final Set<StateExecutor> managedStateExecutors = new HashSet(1);

    /* renamed from: org.apache.flink.state.forst.ForStKeyedStateBackend$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/state/forst/ForStKeyedStateBackend$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$state$v2$StateDescriptor$Type = new int[StateDescriptor.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$state$v2$StateDescriptor$Type[StateDescriptor.Type.VALUE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$v2$StateDescriptor$Type[StateDescriptor.Type.LIST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$v2$StateDescriptor$Type[StateDescriptor.Type.MAP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$v2$StateDescriptor$Type[StateDescriptor.Type.REDUCING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$v2$StateDescriptor$Type[StateDescriptor.Type.AGGREGATING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public ForStKeyedStateBackend(UUID uuid, ExecutionConfig executionConfig, ForStResourceContainer forStResourceContainer, int i, TypeSerializer<K> typeSerializer, Supplier<SerializedCompositeKeyBuilder<K>> supplier, Supplier<DataOutputSerializer> supplier2, Supplier<DataInputDeserializer> supplier3, RocksDB rocksDB, LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> map, Function<String, ColumnFamilyOptions> function, ColumnFamilyHandle columnFamilyHandle, ForStSnapshotStrategyBase<K, ?> forStSnapshotStrategyBase, PriorityQueueSetFactory priorityQueueSetFactory, CloseableRegistry closeableRegistry, ForStNativeMetricMonitor forStNativeMetricMonitor, InternalKeyContext<K> internalKeyContext, TtlTimeProvider ttlTimeProvider, ForStDBTtlCompactFiltersManager forStDBTtlCompactFiltersManager) {
        this.backendUID = uuid;
        this.executionConfig = executionConfig;
        this.optionsContainer = (ForStResourceContainer) Preconditions.checkNotNull(forStResourceContainer);
        this.keyGroupPrefixBytes = i;
        this.keyGroupRange = internalKeyContext.getKeyGroupRange();
        this.keySerializer = typeSerializer;
        this.serializedKeyBuilder = supplier;
        this.valueSerializerView = supplier2;
        this.valueDeserializerView = supplier3;
        this.db = rocksDB;
        this.kvStateInformation = linkedHashMap;
        this.columnFamilyOptionsFactory = function;
        this.defaultColumnFamily = columnFamilyHandle;
        this.snapshotStrategy = forStSnapshotStrategyBase;
        this.cancelStreamRegistry = closeableRegistry;
        this.nativeMetricMonitor = forStNativeMetricMonitor;
        this.ttlTimeProvider = ttlTimeProvider;
        this.ttlCompactFiltersManager = forStDBTtlCompactFiltersManager;
        this.priorityQueueFactory = priorityQueueSetFactory;
        if (priorityQueueSetFactory instanceof HeapPriorityQueueSetFactory) {
            this.heapPriorityQueuesManager = new HeapPriorityQueuesManager(map, (HeapPriorityQueueSetFactory) priorityQueueSetFactory, internalKeyContext.getKeyGroupRange(), internalKeyContext.getNumberOfKeyGroups());
        } else {
            this.heapPriorityQueuesManager = null;
        }
    }

    public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
        this.stateRequestHandler = stateRequestHandler;
    }

    public <N, S extends State, SV> S getOrCreateKeyedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<SV> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(typeSerializer, "Namespace serializer");
        InternalKeyedState<K, ?, ?> internalKeyedState = this.keyValueStatesByName.get(stateDescriptor.getStateId());
        if (internalKeyedState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            internalKeyedState = createState(n, typeSerializer, stateDescriptor);
            this.keyValueStatesByName.put(stateDescriptor.getStateId(), internalKeyedState);
        }
        return internalKeyedState;
    }

    @Nonnull
    protected <N, S extends State, SV> S createState(@Nonnull N n, @Nonnull TypeSerializer<N> typeSerializer, @Nonnull StateDescriptor<SV> stateDescriptor) throws Exception {
        return (S) TtlStateFactory.createStateAndWrapWithTtlIfEnabled(n, typeSerializer, stateDescriptor, this, this.ttlTimeProvider);
    }

    @Nonnull
    public <N, S extends InternalKeyedState, SV> S createStateInternal(@Nonnull N n, @Nonnull TypeSerializer<N> typeSerializer, @Nonnull StateDescriptor<SV> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(this.stateRequestHandler, "A non-null stateRequestHandler must be setup before createState");
        ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle) tryRegisterKvStateInformation(stateDescriptor, typeSerializer).f0;
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$state$v2$StateDescriptor$Type[stateDescriptor.getType().ordinal()]) {
            case 1:
                Supplier<SerializedCompositeKeyBuilder<K>> supplier = this.serializedKeyBuilder;
                Objects.requireNonNull(typeSerializer);
                return new ForStValueState(this.stateRequestHandler, columnFamilyHandle, (ValueStateDescriptor) stateDescriptor, supplier, n, typeSerializer::duplicate, this.valueSerializerView, this.valueDeserializerView);
            case 2:
                Supplier<SerializedCompositeKeyBuilder<K>> supplier2 = this.serializedKeyBuilder;
                Objects.requireNonNull(typeSerializer);
                return new ForStListState(this.stateRequestHandler, columnFamilyHandle, (ListStateDescriptor) stateDescriptor, supplier2, n, typeSerializer::duplicate, this.valueSerializerView, this.valueDeserializerView);
            case 3:
                Supplier supplier3 = DataInputDeserializer::new;
                StateRequestHandler stateRequestHandler = this.stateRequestHandler;
                Supplier<SerializedCompositeKeyBuilder<K>> supplier4 = this.serializedKeyBuilder;
                Objects.requireNonNull(typeSerializer);
                return ForStMapState.create(stateDescriptor, stateRequestHandler, columnFamilyHandle, supplier4, n, typeSerializer::duplicate, this.valueSerializerView, supplier3, this.valueDeserializerView, this.keyGroupPrefixBytes);
            case ForStStateDataTransfer.DEFAULT_THREAD_NUM /* 4 */:
                Supplier<SerializedCompositeKeyBuilder<K>> supplier5 = this.serializedKeyBuilder;
                Objects.requireNonNull(typeSerializer);
                return new ForStReducingState(this.stateRequestHandler, columnFamilyHandle, (ReducingStateDescriptor) stateDescriptor, supplier5, n, typeSerializer::duplicate, this.valueSerializerView, this.valueDeserializerView);
            case 5:
                StateRequestHandler stateRequestHandler2 = this.stateRequestHandler;
                Supplier<SerializedCompositeKeyBuilder<K>> supplier6 = this.serializedKeyBuilder;
                Objects.requireNonNull(typeSerializer);
                return new ForStAggregatingState((AggregatingStateDescriptor) stateDescriptor, stateRequestHandler2, columnFamilyHandle, supplier6, n, typeSerializer::duplicate, this.valueSerializerView, this.valueDeserializerView);
            default:
                throw new UnsupportedOperationException(String.format("Unsupported state type: %s", stateDescriptor.getType()));
        }
    }

    private <N, SV> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> tryRegisterKvStateInformation(StateDescriptor<SV> stateDescriptor, TypeSerializer<N> typeSerializer) throws Exception {
        RegisteredStateMetaInfoBase registeredKeyValueStateBackendMetaInfo;
        ForStOperationUtils.ForStKvStateInfo createStateInfo;
        ForStOperationUtils.ForStKvStateInfo forStKvStateInfo = this.kvStateInformation.get(stateDescriptor.getStateId());
        TypeSerializer<SV> serializer = stateDescriptor.getSerializer();
        if (forStKvStateInfo != null) {
            registeredKeyValueStateBackendMetaInfo = updateRestoredStateMetaInfo(Tuple2.of(forStKvStateInfo.columnFamilyHandle, forStKvStateInfo.metaInfo), stateDescriptor, serializer, typeSerializer);
            createStateInfo = new ForStOperationUtils.ForStKvStateInfo(forStKvStateInfo.columnFamilyHandle, registeredKeyValueStateBackendMetaInfo);
            this.kvStateInformation.put(stateDescriptor.getStateId(), createStateInfo);
        } else {
            registeredKeyValueStateBackendMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(stateDescriptor.getStateId(), stateDescriptor.getType(), typeSerializer, serializer, StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform());
            createStateInfo = ForStOperationUtils.createStateInfo(registeredKeyValueStateBackendMetaInfo, this.db, this.columnFamilyOptionsFactory, this.ttlCompactFiltersManager, this.optionsContainer.getWriteBufferManagerCapacity(), this.cancelStreamRegistry);
            ForStOperationUtils.registerKvStateInformation(this.kvStateInformation, this.nativeMetricMonitor, stateDescriptor.getStateId(), createStateInfo);
        }
        return Tuple2.of(createStateInfo.columnFamilyHandle, registeredKeyValueStateBackendMetaInfo);
    }

    private <N, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> tuple2, StateDescriptor<SV> stateDescriptor, TypeSerializer<SV> typeSerializer, TypeSerializer<N> typeSerializer2) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo<N, SV> registeredKeyValueStateBackendMetaInfo = (RegisteredKeyValueStateBackendMetaInfo) tuple2.f1;
        registeredKeyValueStateBackendMetaInfo.checkStateMetaInfo(stateDescriptor);
        TypeSerializer stateSerializer = registeredKeyValueStateBackendMetaInfo.getStateSerializer();
        TypeSerializerSchemaCompatibility updateStateSerializer = registeredKeyValueStateBackendMetaInfo.updateStateSerializer(typeSerializer);
        if (!typeSerializer.equals(stateSerializer) && updateStateSerializer.isCompatibleAfterMigration()) {
            throw new UnsupportedOperationException("State migration not support yet.");
        }
        if (updateStateSerializer.isIncompatible()) {
            throw new StateMigrationException("The new state serializer (" + typeSerializer + ") must not be incompatible with the old state serializer (" + stateSerializer + ").");
        }
        return registeredKeyValueStateBackendMetaInfo;
    }

    @Nonnull
    public StateExecutor createStateExecutor() {
        ForStStateExecutor forStStateExecutor;
        synchronized (this.lock) {
            if (this.disposed) {
                throw new FlinkRuntimeException("Attempt to create StateExecutor after ForStKeyedStateBackend is disposed.");
            }
            forStStateExecutor = new ForStStateExecutor(this.optionsContainer.isCoordinatorInline(), this.optionsContainer.isWriteInline(), this.optionsContainer.getReadIoParallelism(), this.optionsContainer.getWriteIoParallelism(), this.db, this.optionsContainer.getWriteOptions());
            this.managedStateExecutors.add(forStStateExecutor);
        }
        return forStStateExecutor;
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        return new SnapshotStrategyRunner(this.snapshotStrategy.getDescription(), this.snapshotStrategy, this.cancelStreamRegistry, SnapshotExecutionType.ASYNCHRONOUS).snapshot(j, j2, checkpointStreamFactory, checkpointOptions);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.snapshotStrategy != null) {
            this.snapshotStrategy.notifyCheckpointComplete(j);
        }
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.snapshotStrategy != null) {
            this.snapshotStrategy.notifyCheckpointAborted(j);
        }
    }

    public void notifyCheckpointSubsumed(long j) throws Exception {
        LOG.info("Backend:{} checkpoint: {} subsumed.", this.backendUID, Long.valueOf(j));
    }

    public void dispose() {
        synchronized (this.lock) {
            if (this.disposed) {
                return;
            }
            Iterator<StateExecutor> it = this.managedStateExecutors.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            if (this.db != null) {
                if (this.nativeMetricMonitor != null) {
                    this.nativeMetricMonitor.close();
                }
                IOUtils.closeQuietly(this.defaultColumnFamily);
                IOUtils.closeQuietly(this.db);
                LOG.info("Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.", this.optionsContainer.getLocalBasePath(), this.optionsContainer.getRemoteBasePath());
                try {
                    this.optionsContainer.clearDirectories();
                } catch (Exception e) {
                    LOG.warn("Could not delete ForSt local working directory {}, remote working directory {}.", new Object[]{this.optionsContainer.getLocalBasePath(), this.optionsContainer.getRemoteBasePath(), e});
                }
                IOUtils.closeQuietly(this.optionsContainer);
            }
            IOUtils.closeQuietly(this.snapshotStrategy);
            this.disposed = true;
        }
    }

    public boolean isSafeToReuseKVState() {
        return true;
    }

    @VisibleForTesting
    Path getLocalBasePath() {
        return this.optionsContainer.getLocalBasePath();
    }

    @VisibleForTesting
    Path getRemoteBasePath() {
        return this.optionsContainer.getRemoteBasePath();
    }

    public void close() throws IOException {
        dispose();
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        return create(str, typeSerializer, false);
    }

    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer, boolean z) {
        return this.heapPriorityQueuesManager != null ? this.heapPriorityQueuesManager.createOrUpdate(str, typeSerializer, z) : this.priorityQueueFactory.create(str, typeSerializer, z);
    }
}
