package org.apache.flink.state.forst;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendBuilder;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStStateBackend;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.restore.ForStHeapTimersFullRestoreOperation;
import org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation;
import org.apache.flink.state.forst.restore.ForStNoneRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreResult;
import org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.class */
public class ForStKeyedStateBackendBuilder<K> implements StateBackendBuilder<ForStKeyedStateBackend<K>, BackendBuildingException> {
    private static final int KEY_SERIALIZER_BUFFER_START_SIZE = 32;
    private static final int VALUE_SERIALIZER_BUFFER_START_SIZE = 128;
    private final String operatorIdentifier;
    private final ForStPriorityQueueConfig priorityQueueConfig;
    protected final ClassLoader userCodeClassLoader;
    protected final CloseableRegistry cancelStreamRegistry;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final int numberOfKeyGroups;
    private final KeyGroupRange keyGroupRange;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final ForStResourceContainer optionsContainer;
    private final MetricGroup metricGroup;
    private final StateBackend.CustomInitializationMetrics customInitializationMetrics;
    private boolean enableIncrementalCheckpointing;
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private long writeBatchSize = ((MemorySize) ForStConfigurableOptions.WRITE_BATCH_SIZE.defaultValue()).getBytes();
    private boolean rescalingUseDeleteFilesInRange = false;
    private double overlapFractionThreshold = 0.5d;
    private boolean useIngestDbRestoreMode = false;
    private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;
    private ForStNativeMetricOptions nativeMetricOptions = new ForStNativeMetricOptions();

    public ForStKeyedStateBackendBuilder(String str, ClassLoader classLoader, ForStResourceContainer forStResourceContainer, Function<String, ColumnFamilyOptions> function, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, ForStPriorityQueueConfig forStPriorityQueueConfig, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) {
        this.operatorIdentifier = str;
        this.userCodeClassLoader = classLoader;
        this.optionsContainer = forStResourceContainer;
        this.columnFamilyOptionsFactory = (Function) Preconditions.checkNotNull(function);
        this.keySerializerProvider = StateSerializerProvider.fromNewRegisteredSerializer(typeSerializer);
        this.numberOfKeyGroups = i;
        this.keyGroupRange = keyGroupRange;
        this.executionConfig = executionConfig;
        this.priorityQueueConfig = forStPriorityQueueConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.metricGroup = metricGroup;
        this.customInitializationMetrics = customInitializationMetrics;
        this.restoreStateHandles = collection;
        this.cancelStreamRegistry = closeableRegistry;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForStKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean z) {
        this.enableIncrementalCheckpointing = z;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForStKeyedStateBackendBuilder<K> setNativeMetricOptions(ForStNativeMetricOptions forStNativeMetricOptions) {
        this.nativeMetricOptions = forStNativeMetricOptions;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForStKeyedStateBackendBuilder<K> setOverlapFractionThreshold(double d) {
        this.overlapFractionThreshold = d;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForStKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean z) {
        this.useIngestDbRestoreMode = z;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForStKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange(boolean z) {
        this.rescalingUseDeleteFilesInRange = z;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForStKeyedStateBackendBuilder<K> setRecoveryClaimMode(RecoveryClaimMode recoveryClaimMode) {
        this.recoveryClaimMode = recoveryClaimMode;
        return this;
    }

    /* renamed from: build, reason: merged with bridge method [inline-methods] */
    public ForStKeyedStateBackend<K> m9build() throws BackendBuildingException {
        ColumnFamilyHandle columnFamilyHandle = null;
        ForStNativeMetricMonitor forStNativeMetricMonitor = null;
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap = new LinkedHashMap<>();
        LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> linkedHashMap2 = new LinkedHashMap<>();
        ForStDBTtlCompactFiltersManager forStDBTtlCompactFiltersManager = new ForStDBTtlCompactFiltersManager(this.ttlTimeProvider, this.optionsContainer.getQueryTimeAfterNumEntries().longValue(), this.optionsContainer.getPeriodicCompactionTime());
        RocksDB rocksDB = null;
        ForStRestoreOperation forStRestoreOperation = null;
        int computeRequiredBytesInKeyGroupPrefix = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(this.numberOfKeyGroups);
        ResourceGuard resourceGuard = new ResourceGuard();
        ForStSnapshotStrategyBase<K, ?> forStSnapshotStrategyBase = null;
        Supplier supplier = () -> {
            return new SerializedCompositeKeyBuilder(this.keySerializerProvider.currentSchemaSerializer().duplicate(), computeRequiredBytesInKeyGroupPrefix, KEY_SERIALIZER_BUFFER_START_SIZE);
        };
        Supplier supplier2 = () -> {
            return new DataOutputSerializer(128);
        };
        Supplier supplier3 = DataInputDeserializer::new;
        UUID randomUUID = UUID.randomUUID();
        try {
            this.optionsContainer.prepareDirectories();
            forStRestoreOperation = getForStRestoreOperation(computeRequiredBytesInKeyGroupPrefix, linkedHashMap, linkedHashMap2, forStDBTtlCompactFiltersManager);
            ForStRestoreResult mo55restore = forStRestoreOperation.mo55restore();
            rocksDB = mo55restore.getDb();
            columnFamilyHandle = mo55restore.getDefaultColumnFamilyHandle();
            forStNativeMetricMonitor = mo55restore.getNativeMetricMonitor();
            forStSnapshotStrategyBase = initializeSnapshotStrategy(rocksDB, resourceGuard, this.keySerializerProvider.currentSchemaSerializer(), linkedHashMap, this.keyGroupRange, computeRequiredBytesInKeyGroupPrefix, randomUUID, new TreeMap(), -1L);
            HeapPriorityQueueSetFactory createHeapQueueFactory = createHeapQueueFactory();
            InternalKeyContextImpl internalKeyContextImpl = new InternalKeyContextImpl(this.keyGroupRange, this.numberOfKeyGroups);
            this.logger.info("Finished building ForSt keyed state-backend at local base path: {}, remote base path: {}.", this.optionsContainer.getLocalBasePath(), this.optionsContainer.getRemoteBasePath());
            return new ForStKeyedStateBackend<>(randomUUID, this.executionConfig, this.optionsContainer, computeRequiredBytesInKeyGroupPrefix, this.keySerializerProvider.currentSchemaSerializer(), supplier, supplier2, supplier3, rocksDB, linkedHashMap, linkedHashMap2, this.columnFamilyOptionsFactory, columnFamilyHandle, forStSnapshotStrategyBase, createHeapQueueFactory, closeableRegistry, forStNativeMetricMonitor, internalKeyContextImpl, this.ttlTimeProvider, forStDBTtlCompactFiltersManager);
        } catch (Throwable th) {
            IOUtils.closeQuietly(closeableRegistry);
            IOUtils.closeQuietly(columnFamilyHandle);
            IOUtils.closeQuietly(forStNativeMetricMonitor);
            IOUtils.closeQuietly(rocksDB);
            IOUtils.closeQuietly(forStRestoreOperation);
            try {
                this.optionsContainer.clearDirectories();
            } catch (Exception e) {
                this.logger.warn("Failed to delete ForSt local base path {}, remote base path {}.", new Object[]{this.optionsContainer.getLocalBasePath(), this.optionsContainer.getRemoteBasePath(), e});
            }
            IOUtils.closeQuietly(this.optionsContainer);
            IOUtils.closeQuietly(forStSnapshotStrategyBase);
            if (th instanceof BackendBuildingException) {
                throw th;
            }
            this.logger.error("Caught unexpected exception.", th);
            throw new BackendBuildingException("Caught unexpected exception.", th);
        }
    }

    private ForStRestoreOperation getForStRestoreOperation(int i, LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap, LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> linkedHashMap2, ForStDBTtlCompactFiltersManager forStDBTtlCompactFiltersManager) {
        Path localForStPath = this.optionsContainer.getRemoteForStPath() == null ? this.optionsContainer.getLocalForStPath() : new Path("/db");
        if (CollectionUtil.isEmptyOrAllElementsNull(this.restoreStateHandles)) {
            return new ForStNoneRestoreOperation(Collections.emptyMap(), localForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, forStDBTtlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity());
        }
        if (this.restoreStateHandles.iterator().next() instanceof IncrementalRemoteKeyedStateHandle) {
            return new ForStIncrementalRestoreOperation(this.operatorIdentifier, this.keyGroupRange, i, this.cancelStreamRegistry, this.userCodeClassLoader, linkedHashMap, this.keySerializerProvider, this.optionsContainer, this.optionsContainer.getBasePath(), localForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, forStDBTtlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), this.customInitializationMetrics, CollectionUtil.checkedSubTypeCast(this.restoreStateHandles, IncrementalRemoteKeyedStateHandle.class), this.overlapFractionThreshold, this.useIngestDbRestoreMode, this.rescalingUseDeleteFilesInRange, this.recoveryClaimMode);
        }
        if (this.priorityQueueConfig.getPriorityQueueStateType() == ForStStateBackend.PriorityQueueStateType.HEAP) {
            return new ForStHeapTimersFullRestoreOperation(this.keyGroupRange, this.numberOfKeyGroups, this.userCodeClassLoader, linkedHashMap, linkedHashMap2, createHeapQueueFactory(), this.keySerializerProvider, localForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, forStDBTtlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), this.restoreStateHandles, this.cancelStreamRegistry);
        }
        throw new UnsupportedOperationException("Not support restoring yet for ForStStateBackend");
    }

    private ForStSnapshotStrategyBase<K, ?> initializeSnapshotStrategy(@Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int i, @Nonnull UUID uuid, @Nonnull SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap, long j) {
        ForStStateDataTransfer forStStateDataTransfer = new ForStStateDataTransfer(4, this.optionsContainer.getFileSystem());
        return this.enableIncrementalCheckpointing ? new ForStIncrementalSnapshotStrategy(rocksDB, resourceGuard, this.optionsContainer, typeSerializer, linkedHashMap, keyGroupRange, i, uuid, sortedMap, forStStateDataTransfer, j) : new ForStNativeFullSnapshotStrategy(rocksDB, resourceGuard, this.optionsContainer, typeSerializer, linkedHashMap, keyGroupRange, i, uuid, forStStateDataTransfer);
    }

    private HeapPriorityQueueSetFactory createHeapQueueFactory() {
        return new HeapPriorityQueueSetFactory(this.keyGroupRange, this.numberOfKeyGroups, 128);
    }
}
