package org.apache.flink.state.forst.restore;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager;
import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
import org.apache.flink.state.forst.ForStIncrementalCheckpointUtils;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.sync.ForStIteratorWrapper;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.RunnableWithException;
import org.forstdb.Checkpoint;
import org.forstdb.ColumnFamilyDescriptor;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.DBOptions;
import org.forstdb.ExportImportFilesMetaData;
import org.forstdb.ReadOptions;
import org.forstdb.RocksDB;
import org.forstdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.class */
public class ForStIncrementalRestoreOperation<K> implements ForStRestoreOperation {
    private static final Logger logger = LoggerFactory.getLogger(ForStIncrementalRestoreOperation.class);
    private final String operatorIdentifier;
    private final ForStHandle forstHandle;
    private final List<IncrementalRemoteKeyedStateHandle> restoreStateHandles;
    private final CloseableRegistry cancelStreamRegistry;
    private final KeyGroupRange keyGroupRange;
    private final int keyGroupPrefixBytes;
    private final ForStResourceContainer optionsContainer;
    private final Path forstBasePath;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private final StateBackend.CustomInitializationMetrics customInitializationMetrics;
    private final long writeBatchSize;
    private final double overlapFractionThreshold;
    private final boolean useIngestDbRestoreMode;
    private final boolean useDeleteFilesInRange;
    private boolean isKeySerializerCompatibilityChecked;
    private final RecoveryClaimMode recoveryClaimMode;
    private final SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> restoredSstFiles = new TreeMap();
    private long lastCompletedCheckpointId = -1;
    private UUID backendUID = UUID.randomUUID();

    /* loaded from: input_file:org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation$RestoredDBInstance.class */
    public static class RestoredDBInstance implements AutoCloseable {

        @Nonnull
        final RocksDB db;

        @Nonnull
        final ColumnFamilyHandle defaultColumnFamilyHandle;

        @Nonnull
        final List<ColumnFamilyHandle> columnFamilyHandles;

        @Nonnull
        final List<ColumnFamilyDescriptor> columnFamilyDescriptors;

        @Nonnull
        final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        final ReadOptions readOptions = new ReadOptions();
        final IncrementalRemoteKeyedStateHandle srcStateHandle;
        final String instancePath;

        RestoredDBInstance(@Nonnull RocksDB rocksDB, @Nonnull List<ColumnFamilyHandle> list, @Nonnull List<ColumnFamilyDescriptor> list2, @Nonnull List<StateMetaInfoSnapshot> list3, @Nonnull IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle, @Nonnull String str) {
            this.db = rocksDB;
            this.defaultColumnFamilyHandle = list.remove(0);
            this.columnFamilyHandles = list;
            this.columnFamilyDescriptors = list2;
            this.stateMetaInfoSnapshots = list3;
            this.srcStateHandle = incrementalRemoteKeyedStateHandle;
            this.instancePath = str;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            ArrayList arrayList = new ArrayList(this.columnFamilyDescriptors.size() + 1);
            this.columnFamilyDescriptors.forEach(columnFamilyDescriptor -> {
                arrayList.add(columnFamilyDescriptor.getOptions());
            });
            ForStOperationUtils.addColumnFamilyOptionsToCloseLater(arrayList, this.defaultColumnFamilyHandle);
            IOUtils.closeQuietly(this.defaultColumnFamilyHandle);
            IOUtils.closeAllQuietly(this.columnFamilyHandles);
            IOUtils.closeQuietly(this.db);
            IOUtils.closeAllQuietly(arrayList);
            IOUtils.closeQuietly(this.readOptions);
        }
    }

    public ForStIncrementalRestoreOperation(String str, KeyGroupRange keyGroupRange, int i, CloseableRegistry closeableRegistry, ClassLoader classLoader, Map<String, ForStOperationUtils.ForStKvStateInfo> map, StateSerializerProvider<K> stateSerializerProvider, ForStResourceContainer forStResourceContainer, Path path, Path path2, DBOptions dBOptions, Function<String, ColumnFamilyOptions> function, ForStNativeMetricOptions forStNativeMetricOptions, MetricGroup metricGroup, @Nonnull ForStDBTtlCompactFiltersManager forStDBTtlCompactFiltersManager, @Nonnegative long j, Long l, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<IncrementalRemoteKeyedStateHandle> collection, double d, boolean z, boolean z2, RecoveryClaimMode recoveryClaimMode) {
        this.forstHandle = new ForStHandle(map, path2, dBOptions, function, forStNativeMetricOptions, metricGroup, forStDBTtlCompactFiltersManager, l);
        this.operatorIdentifier = str;
        this.customInitializationMetrics = customInitializationMetrics;
        this.restoreStateHandles = (List) collection.stream().collect(Collectors.toList());
        this.cancelStreamRegistry = closeableRegistry;
        this.keyGroupRange = keyGroupRange;
        this.keyGroupPrefixBytes = i;
        this.optionsContainer = forStResourceContainer;
        this.forstBasePath = path;
        this.keySerializerProvider = stateSerializerProvider;
        this.userCodeClassLoader = classLoader;
        this.writeBatchSize = j;
        this.overlapFractionThreshold = d;
        this.useIngestDbRestoreMode = z;
        this.useDeleteFilesInRange = z2;
        this.recoveryClaimMode = recoveryClaimMode;
    }

    @Override // org.apache.flink.state.forst.restore.ForStRestoreOperation
    /* renamed from: restore */
    public ForStRestoreResult mo55restore() throws Exception {
        if (this.restoreStateHandles == null || this.restoreStateHandles.isEmpty()) {
            return null;
        }
        logger.info("Starting RocksDB incremental recovery in operator {} target key-group range {}. Use IngestDB={}, State Handles={}", new Object[]{this.operatorIdentifier, this.keyGroupRange.prettyPrintInterval(), Boolean.valueOf(this.useIngestDbRestoreMode), this.restoreStateHandles});
        ArrayList arrayList = new ArrayList();
        int i = -1;
        if (!this.useIngestDbRestoreMode || this.restoreStateHandles.size() == 1) {
            i = ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial(this.restoreStateHandles, this.keyGroupRange, this.overlapFractionThreshold);
            arrayList.add(new StateHandleTransferSpec(this.restoreStateHandles.get(i), new Path(this.forstBasePath, ForStResourceContainer.DB_DIR_STRING)));
        }
        for (int i2 = 0; i2 < this.restoreStateHandles.size(); i2++) {
            if (i2 != i) {
                arrayList.add(new StateHandleTransferSpec(this.restoreStateHandles.get(i2), new Path(this.forstBasePath, UUID.randomUUID().toString())));
            }
        }
        try {
            runAndReportDuration(() -> {
                transferAllStateHandles(arrayList);
            }, "DownloadStateDurationMs");
            runAndReportDuration(() -> {
                innerRestore(arrayList);
            }, "RestoreStateDurationMs");
            ForStRestoreResult forStRestoreResult = new ForStRestoreResult(this.forstHandle.getDb(), this.forstHandle.getDefaultColumnFamilyHandle(), this.forstHandle.getNativeMetricMonitor(), this.lastCompletedCheckpointId, this.backendUID, this.restoredSstFiles);
            if (!this.useIngestDbRestoreMode || this.restoreStateHandles.size() == 1) {
                arrayList.remove(0);
            }
            arrayList.stream().map((v0) -> {
                return v0.getTransferDestination();
            }).forEach(path -> {
                try {
                    getFileSystem(path).delete(path, true);
                } catch (IOException e) {
                    logger.warn("Failed to delete transfer destination {}", path);
                }
            });
            return forStRestoreResult;
        } catch (Throwable th) {
            if (!this.useIngestDbRestoreMode || this.restoreStateHandles.size() == 1) {
                arrayList.remove(0);
            }
            arrayList.stream().map((v0) -> {
                return v0.getTransferDestination();
            }).forEach(path2 -> {
                try {
                    getFileSystem(path2).delete(path2, true);
                } catch (IOException e) {
                    logger.warn("Failed to delete transfer destination {}", path2);
                }
            });
            throw th;
        }
    }

    private void transferAllStateHandles(List<StateHandleTransferSpec> list) throws Exception {
        ForStStateDataTransfer forStStateDataTransfer = new ForStStateDataTransfer(4, this.optionsContainer.getFileSystem());
        try {
            forStStateDataTransfer.transferAllStateDataToDirectory(list, this.cancelStreamRegistry, this.recoveryClaimMode);
            forStStateDataTransfer.close();
        } catch (Throwable th) {
            try {
                forStStateDataTransfer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void innerRestore(List<StateHandleTransferSpec> list) throws Exception {
        if (list.size() == 1) {
            initBaseDBFromSingleStateHandle(list.get(0));
        } else {
            restoreFromMultipleStateHandles(list);
        }
    }

    private void initBaseDBFromSingleStateHandle(StateHandleTransferSpec stateHandleTransferSpec) throws Exception {
        IncrementalRemoteKeyedStateHandle stateHandle = stateHandleTransferSpec.getStateHandle();
        logger.info("Starting opening base ForSt instance in operator {} with target key-group range {} from state handle {}.", new Object[]{this.operatorIdentifier, this.keyGroupRange.prettyPrintInterval(), stateHandleTransferSpec});
        restoreBaseDBFromMainHandle(stateHandleTransferSpec);
        KeyGroupRange keyGroupRange = stateHandle.getKeyGroupRange();
        if (Objects.equals(keyGroupRange, this.keyGroupRange)) {
            restorePreviousIncrementalFilesStatus(stateHandle);
        } else {
            try {
                ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.forstHandle.getDb(), this.forstHandle.getColumnFamilyHandles(), this.keyGroupRange, keyGroupRange, this.keyGroupPrefixBytes, this.useDeleteFilesInRange);
            } catch (RocksDBException e) {
                logger.error("Failed to clip DB after initialization.", e);
                throw new BackendBuildingException("Failed to clip DB after initialization.", e);
            }
        }
        logger.info("Finished opening base ForSt instance in operator {} with target key-group range {}.", this.operatorIdentifier, this.keyGroupRange.prettyPrintInterval());
    }

    private void restoreFromMultipleStateHandles(List<StateHandleTransferSpec> list) throws Exception {
        logger.info("Starting to restore backend with range {} in operator {} from multiple state handles {} with useIngestDbRestoreMode = {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, list, Boolean.valueOf(this.useIngestDbRestoreMode)});
        byte[] bArr = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup(this.keyGroupRange.getStartKeyGroup(), bArr);
        byte[] bArr2 = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup(this.keyGroupRange.getEndKeyGroup() + 1, bArr2);
        if (this.useIngestDbRestoreMode) {
            mergeStateHandlesWithClipAndIngest(list, bArr, bArr2);
        } else {
            mergeStateHandlesWithCopyFromTemporaryInstance(list.remove(0), list, bArr, bArr2);
        }
        logger.info("Completed restoring backend with range {} in operator {} from multiple state handles with useIngestDbRestoreMode = {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, Boolean.valueOf(this.useIngestDbRestoreMode)});
    }

    private void restoreBaseDBFromMainHandle(StateHandleTransferSpec stateHandleTransferSpec) throws Exception {
        List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = readMetaData(stateHandleTransferSpec.getStateHandle().getMetaDataStateHandle()).getStateMetaInfoSnapshots();
        this.forstHandle.openDB(createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), stateMetaInfoSnapshots, this.cancelStreamRegistry);
    }

    private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle incrementalKeyedStateHandle) {
        this.backendUID = incrementalKeyedStateHandle.getBackendIdentifier();
        this.restoredSstFiles.put(Long.valueOf(incrementalKeyedStateHandle.getCheckpointId()), incrementalKeyedStateHandle.getSharedStateHandles());
        this.lastCompletedCheckpointId = incrementalKeyedStateHandle.getCheckpointId();
        logger.info("Restored previous incremental files status in backend with range {} in operator {}: backend uuid {}, last checkpoint id {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, this.backendUID, Long.valueOf(this.lastCompletedCheckpointId)});
    }

    private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(List<StateMetaInfoSnapshot> list, boolean z) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<StateMetaInfoSnapshot> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(ForStOperationUtils.createColumnFamilyDescriptor(RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(it.next()), this.forstHandle.getColumnFamilyOptionsFactory(), z ? this.forstHandle.getTtlCompactFiltersManager() : null, this.forstHandle.getWriteBufferManagerCapacity()));
        }
        return arrayList;
    }

    private void runAndReportDuration(RunnableWithException runnableWithException, String str) throws Exception {
        SystemClock systemClock = SystemClock.getInstance();
        long relativeTimeMillis = systemClock.relativeTimeMillis();
        runnableWithException.run();
        this.customInitializationMetrics.addMetric(str, systemClock.relativeTimeMillis() - relativeTimeMillis);
    }

    private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle streamStateHandle) throws Exception {
        InputStream inputStream = null;
        try {
            inputStream = streamStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable(inputStream);
            KeyedBackendSerializationProxy<K> readMetaData = readMetaData((DataInputView) new DataInputViewStreamWrapper(inputStream));
            if (this.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                inputStream.close();
            }
            return readMetaData;
        } catch (Throwable th) {
            if (this.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                inputStream.close();
            }
            throw th;
        }
    }

    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView) throws IOException, StateMigrationException {
        KeyedBackendSerializationProxy<K> keyedBackendSerializationProxy = new KeyedBackendSerializationProxy<>(this.userCodeClassLoader);
        keyedBackendSerializationProxy.read(dataInputView);
        if (!this.isKeySerializerCompatibilityChecked) {
            TypeSerializer currentSchemaSerializer = this.keySerializerProvider.currentSchemaSerializer();
            TypeSerializerSchemaCompatibility previousSerializerSnapshotForRestoredState = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(keyedBackendSerializationProxy.getKeySerializerSnapshot());
            if (previousSerializerSnapshotForRestoredState.isCompatibleAfterMigration() || previousSerializerSnapshotForRestoredState.isIncompatible()) {
                throw new StateMigrationException("The new key serializer (" + currentSchemaSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
            }
            this.isKeySerializerCompatibilityChecked = true;
        }
        return keyedBackendSerializationProxy;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.forstHandle.close();
    }

    private void copyTempDbIntoBaseDb(RestoredDBInstance restoredDBInstance, ForStDBWriteBatchWrapper forStDBWriteBatchWrapper, byte[] bArr, byte[] bArr2) throws Exception {
        logger.debug("Starting copy of state handle {} for backend with range {} in operator {} to base DB using temporary instance.", new Object[]{restoredDBInstance.srcStateHandle, this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier});
        List<ColumnFamilyDescriptor> list = restoredDBInstance.columnFamilyDescriptors;
        List<ColumnFamilyHandle> list2 = restoredDBInstance.columnFamilyHandles;
        for (int i = 0; i < list.size(); i++) {
            ColumnFamilyHandle columnFamilyHandle = list2.get(i);
            ColumnFamilyHandle columnFamilyHandle2 = this.forstHandle.getOrRegisterStateColumnFamilyHandle(null, restoredDBInstance.stateMetaInfoSnapshots.get(i), this.cancelStreamRegistry).columnFamilyHandle;
            ForStIteratorWrapper forStIterator = ForStOperationUtils.getForStIterator(restoredDBInstance.db, columnFamilyHandle, restoredDBInstance.readOptions);
            try {
                forStIterator.seek(bArr);
                while (forStIterator.isValid() && ForStIncrementalCheckpointUtils.beforeThePrefixBytes(forStIterator.key(), bArr2)) {
                    forStDBWriteBatchWrapper.put(columnFamilyHandle2, forStIterator.key(), forStIterator.value());
                    forStIterator.next();
                }
                if (forStIterator != null) {
                    forStIterator.close();
                }
            } catch (Throwable th) {
                if (forStIterator != null) {
                    try {
                        forStIterator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        logger.debug("Finished copy of state handle {} for backend with range {} in operator {} using temporary instance.", new Object[]{restoredDBInstance.srcStateHandle, this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier});
    }

    private void cleanUpPathQuietly(@Nonnull Path path) {
        try {
            getFileSystem(this.forstBasePath).delete(path, true);
        } catch (IOException e) {
            logger.warn("Failed to clean up path " + path, e);
        }
    }

    private void copyToBaseDBUsingTempDBs(List<StateHandleTransferSpec> list, byte[] bArr, byte[] bArr2) throws Exception {
        if (list.isEmpty()) {
            return;
        }
        logger.info("Starting to copy state handles for backend with range {} in operator {} using temporary instances.", this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier);
        ForStDBWriteBatchWrapper forStDBWriteBatchWrapper = new ForStDBWriteBatchWrapper(this.forstHandle.getDb(), this.writeBatchSize);
        try {
            Closeable registerCloseableTemporarily = this.cancelStreamRegistry.registerCloseableTemporarily(forStDBWriteBatchWrapper.getCancelCloseable());
            try {
                Iterator<StateHandleTransferSpec> it = list.iterator();
                while (it.hasNext()) {
                    RestoredDBInstance restoreTempDBInstance = restoreTempDBInstance(it.next());
                    try {
                        copyTempDbIntoBaseDb(restoreTempDBInstance, forStDBWriteBatchWrapper, bArr, bArr2);
                        if (restoreTempDBInstance != null) {
                            restoreTempDBInstance.close();
                        }
                    } finally {
                    }
                }
                if (registerCloseableTemporarily != null) {
                    registerCloseableTemporarily.close();
                }
                forStDBWriteBatchWrapper.close();
                logger.info("Competed copying state handles for backend with range {} in operator {} using temporary instances.", this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier);
            } finally {
            }
        } catch (Throwable th) {
            try {
                forStDBWriteBatchWrapper.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void mergeStateHandlesWithCopyFromTemporaryInstance(StateHandleTransferSpec stateHandleTransferSpec, List<StateHandleTransferSpec> list, byte[] bArr, byte[] bArr2) throws Exception {
        logger.info("Starting to merge state for backend with range {} in operator {} from multiple state handles using temporary instances.", this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier);
        initBaseDBFromSingleStateHandle(stateHandleTransferSpec);
        copyToBaseDBUsingTempDBs(list, bArr, bArr2);
        logger.info("Completed merging state for backend with range {} in operator {} from multiple state handles using temporary instances.", this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier);
    }

    public void mergeStateHandlesWithClipAndIngest(List<StateHandleTransferSpec> list, byte[] bArr, byte[] bArr2) throws Exception {
        Path path = new Path(this.forstBasePath, "export-cfs");
        getFileSystem(this.forstBasePath).mkdirs(path);
        HashMap hashMap = new HashMap(list.size());
        ArrayList arrayList = new ArrayList(list.size());
        try {
            KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange = exportColumnFamiliesWithSstDataInKeyGroupsRange(path, list, hashMap, arrayList);
            if (hashMap.isEmpty()) {
                int findTheBestStateHandleForInitial = ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial(this.restoreStateHandles, this.keyGroupRange, this.overlapFractionThreshold);
                arrayList.remove(findTheBestStateHandleForInitial);
                StateHandleTransferSpec stateHandleTransferSpec = new StateHandleTransferSpec(this.restoreStateHandles.get(findTheBestStateHandleForInitial), new Path(this.forstBasePath, ForStResourceContainer.DB_DIR_STRING));
                transferAllStateHandles(Collections.singletonList(stateHandleTransferSpec));
                mergeStateHandlesWithCopyFromTemporaryInstance(stateHandleTransferSpec, arrayList, bArr, bArr2);
            } else {
                initBaseDBFromColumnFamilyImports(hashMap, exportColumnFamiliesWithSstDataInKeyGroupsRange);
                copyToBaseDBUsingTempDBs(arrayList, bArr, bArr2);
            }
        } finally {
            hashMap.values().forEach((v0) -> {
                IOUtils.closeAllQuietly(v0);
            });
            cleanUpPathQuietly(path);
        }
    }

    public void exportColumnFamilies(RocksDB rocksDB, List<ColumnFamilyHandle> list, List<RegisteredStateMetaInfoBase> list2, Path path, Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> map) throws RocksDBException, IOException {
        Preconditions.checkArgument(list.size() == list2.size(), "Lists are aligned by index and must be of the same size!");
        Checkpoint create = Checkpoint.create(rocksDB);
        for (int i = 0; i < list.size(); i++) {
            try {
                RegisteredStateMetaInfoBase.Key asMapKey = list2.get(i).asMapKey();
                String uuid = UUID.randomUUID().toString();
                ExportImportFilesMetaData exportColumnFamily = create.exportColumnFamily(list.get(i), this.optionsContainer.getRemoteBasePath() != null ? path.getName() + "/" + uuid : path.toString() + "/" + uuid);
                FileStatus[] listStatus = getFileSystem(path).listStatus(new Path(path, uuid));
                if (listStatus != null) {
                    int i2 = 0;
                    for (FileStatus fileStatus : listStatus) {
                        if (fileStatus.getPath().getName().endsWith(".sst")) {
                            i2++;
                        }
                    }
                    if (i2 > 0) {
                        map.computeIfAbsent(asMapKey, key -> {
                            return new ArrayList();
                        }).add(exportColumnFamily);
                    }
                } else {
                    IOUtils.closeQuietly(exportColumnFamily);
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (create != null) {
            create.close();
        }
    }

    private void initBaseDBFromColumnFamilyImports(Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> map, KeyGroupRange keyGroupRange) throws Exception {
        logger.info("Starting to import exported state handles for backend with range {} in operator {} using Clip/Ingest DB with exported range {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, keyGroupRange.prettyPrintInterval()});
        this.forstHandle.openDB();
        for (Map.Entry<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> entry : map.entrySet()) {
            this.forstHandle.registerStateColumnFamilyHandleWithImport(entry.getKey(), entry.getValue(), this.cancelStreamRegistry);
        }
        ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.forstHandle.getDb(), this.forstHandle.getColumnFamilyHandles(), this.keyGroupRange, keyGroupRange, this.keyGroupPrefixBytes, this.useDeleteFilesInRange);
        logger.info("Completed importing exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier);
    }

    private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(Path path, List<StateHandleTransferSpec> list, Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> map, List<StateHandleTransferSpec> list2) throws Exception {
        logger.info("Starting restore export for backend with range {} in operator {}.", this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier);
        int i = Integer.MAX_VALUE;
        int i2 = Integer.MIN_VALUE;
        int i3 = 0;
        for (StateHandleTransferSpec stateHandleTransferSpec : list) {
            IncrementalRemoteKeyedStateHandle stateHandle = stateHandleTransferSpec.getStateHandle();
            String str = " for state handle at index " + i3 + " with proclaimed key-group range " + stateHandle.getKeyGroupRange().prettyPrintInterval() + " for backend with range " + this.keyGroupRange.prettyPrintInterval() + " in operator " + this.operatorIdentifier + ".";
            logger.debug("Opening temporary database" + str);
            RestoredDBInstance restoreTempDBInstance = restoreTempDBInstance(stateHandleTransferSpec);
            try {
                List<ColumnFamilyHandle> list3 = restoreTempDBInstance.columnFamilyHandles;
                logger.debug("Checking actual keys of sst files" + str);
                ForStIncrementalCheckpointUtils.RangeCheckResult checkSstDataAgainstKeyGroupRange = ForStIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange(restoreTempDBInstance.db, this.keyGroupPrefixBytes, stateHandle.getKeyGroupRange());
                logger.info("{}" + str, checkSstDataAgainstKeyGroupRange);
                if (checkSstDataAgainstKeyGroupRange.allInRange()) {
                    logger.debug("Start exporting" + str);
                    exportColumnFamilies(restoreTempDBInstance.db, list3, (List) restoreTempDBInstance.stateMetaInfoSnapshots.stream().map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot).collect(Collectors.toList()), path, map);
                    i = Math.min(i, stateHandle.getKeyGroupRange().getStartKeyGroup());
                    i2 = Math.max(i2, stateHandle.getKeyGroupRange().getEndKeyGroup());
                    logger.debug("Done exporting" + str);
                } else {
                    list2.add(stateHandleTransferSpec);
                    logger.debug("Skipped export" + str);
                }
                if (restoreTempDBInstance != null) {
                    restoreTempDBInstance.close();
                }
                i3++;
            } catch (Throwable th) {
                if (restoreTempDBInstance != null) {
                    try {
                        restoreTempDBInstance.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        KeyGroupRange keyGroupRange = i <= i2 ? new KeyGroupRange(i, i2) : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        logger.info("Completed restore export for backend with range {} in operator {}. {} exported handles with overall exported range {}. {} Skipped handles: {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, Integer.valueOf(list.size() - list2.size()), keyGroupRange.prettyPrintInterval(), Integer.valueOf(list2.size()), list2});
        return keyGroupRange;
    }

    private RestoredDBInstance restoreTempDBInstance(StateHandleTransferSpec stateHandleTransferSpec) throws Exception {
        List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = readMetaData(stateHandleTransferSpec.getStateHandle().getMetaDataStateHandle()).getStateMetaInfoSnapshots();
        List<ColumnFamilyDescriptor> createColumnFamilyDescriptors = createColumnFamilyDescriptors(stateMetaInfoSnapshots, false);
        ArrayList arrayList = new ArrayList(stateMetaInfoSnapshots.size() + 1);
        String path = this.optionsContainer.getRemoteBasePath() != null ? "/" + stateHandleTransferSpec.getTransferDestination().getName() : stateHandleTransferSpec.getTransferDestination().toString();
        DBOptions dBOptions = new DBOptions(this.forstHandle.getDbOptions());
        dBOptions.setDbLogDir("");
        return new RestoredDBInstance(ForStOperationUtils.openDB(path, createColumnFamilyDescriptors, arrayList, ForStOperationUtils.createColumnFamilyOptions(this.forstHandle.getColumnFamilyOptionsFactory(), "default"), dBOptions), arrayList, createColumnFamilyDescriptors, stateMetaInfoSnapshots, stateHandleTransferSpec.getStateHandle(), stateHandleTransferSpec.getTransferDestination().toString());
    }

    private FileSystem getFileSystem(Path path) throws IOException {
        return this.optionsContainer.getFileSystem() != null ? this.optionsContainer.getFileSystem() : path.getFileSystem();
    }
}
