package org.apache.flink.state.forst.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.RocksDB;
import org.forstdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.class */
public class ForStNativeFullSnapshotStrategy<K> extends ForStSnapshotStrategyBase<K, ForStSnapshotStrategyBase.ForStNativeSnapshotResources> {
    private static final Logger LOG = LoggerFactory.getLogger(ForStNativeFullSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous full ForStDB snapshot";
    protected final ForStStateDataTransfer stateTransfer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy$ForStNativeFullSnapshotOperation.class */
    public final class ForStNativeFullSnapshotOperation extends ForStSnapshotStrategyBase<K, ForStSnapshotStrategyBase.ForStNativeSnapshotResources>.ForStSnapshotOperation {

        @Nonnull
        private final ForStSnapshotStrategyBase.ForStNativeSnapshotResources snapshotResources;

        @Nonnull
        private final SnapshotType.SharingFilesStrategy sharingFilesStrategy;

        private ForStNativeFullSnapshotOperation(SnapshotType.SharingFilesStrategy sharingFilesStrategy, long j, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull ForStSnapshotStrategyBase.ForStNativeSnapshotResources forStNativeSnapshotResources) {
            super(j, checkpointStreamFactory);
            this.snapshotResources = forStNativeSnapshotResources;
            this.sharingFilesStrategy = sharingFilesStrategy;
        }

        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry closeableRegistry) throws Exception {
            boolean z = false;
            try {
                SnapshotResult<StreamStateHandle> materializeMetaData = ForStNativeFullSnapshotStrategy.this.materializeMetaData(closeableRegistry, this.tmpResourcesRegistry, this.snapshotResources.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                ArrayList arrayList = new ArrayList();
                z = true;
                SnapshotResult<KeyedStateHandle> of = SnapshotResult.of(new IncrementalRemoteKeyedStateHandle(ForStNativeFullSnapshotStrategy.this.backendUID, ForStNativeFullSnapshotStrategy.this.keyGroupRange, this.checkpointId, Collections.emptyList(), arrayList, materializeMetaData.getJobManagerOwnedSnapshot(), materializeMetaData.getStateSize() + uploadSnapshotFiles(arrayList, closeableRegistry, this.tmpResourcesRegistry)));
                this.snapshotResources.release();
                if (1 == 0) {
                    try {
                        this.tmpResourcesRegistry.close();
                    } catch (Exception e) {
                        ForStNativeFullSnapshotStrategy.LOG.warn("Could not properly clean tmp resources.", e);
                    }
                }
                return of;
            } catch (Throwable th) {
                this.snapshotResources.release();
                if (!z) {
                    try {
                        this.tmpResourcesRegistry.close();
                    } catch (Exception e2) {
                        ForStNativeFullSnapshotStrategy.LOG.warn("Could not properly clean tmp resources.", e2);
                    }
                }
                throw th;
            }
        }

        private long uploadSnapshotFiles(@Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> list, @Nonnull CloseableRegistry closeableRegistry, @Nonnull CloseableRegistry closeableRegistry2) throws Exception {
            long j = 0;
            if (this.snapshotResources.liveFiles.size() > 0) {
                List<IncrementalKeyedStateHandle.HandleAndLocalPath> transferFilesToCheckpointFs = ForStNativeFullSnapshotStrategy.this.stateTransfer.transferFilesToCheckpointFs(this.sharingFilesStrategy, (List) this.snapshotResources.liveFiles.stream().filter(path -> {
                    return (path.getName().endsWith(ForStSnapshotUtil.CURRENT_FILE_NAME) || path.getName().startsWith(ForStSnapshotUtil.MANIFEST_FILE_PREFIX)) ? false : true;
                }).collect(Collectors.toList()), this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, closeableRegistry, closeableRegistry2);
                long sum = 0 + transferFilesToCheckpointFs.stream().mapToLong(handleAndLocalPath -> {
                    return handleAndLocalPath.getStateSize();
                }).sum();
                list.addAll(transferFilesToCheckpointFs);
                IncrementalKeyedStateHandle.HandleAndLocalPath transferFileToCheckpointFs = ForStNativeFullSnapshotStrategy.this.stateTransfer.transferFileToCheckpointFs(SnapshotType.SharingFilesStrategy.NO_SHARING, this.snapshotResources.manifestFilePath, this.snapshotResources.manifestFileSize, this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, closeableRegistry, closeableRegistry2);
                list.add(transferFileToCheckpointFs);
                long stateSize = sum + transferFileToCheckpointFs.getStateSize();
                IncrementalKeyedStateHandle.HandleAndLocalPath writeFileToCheckpointFs = ForStNativeFullSnapshotStrategy.this.stateTransfer.writeFileToCheckpointFs(ForStSnapshotUtil.CURRENT_FILE_NAME, this.snapshotResources.getCurrentFileContent(), this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, closeableRegistry, closeableRegistry2);
                list.add(writeFileToCheckpointFs);
                j = stateSize + writeFileToCheckpointFs.getStateSize();
            }
            return j;
        }
    }

    public ForStNativeFullSnapshotStrategy(@Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull ForStResourceContainer forStResourceContainer, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, int i, @Nonnull UUID uuid, @Nonnull ForStStateDataTransfer forStStateDataTransfer) {
        this(DESCRIPTION, rocksDB, resourceGuard, forStResourceContainer, typeSerializer, linkedHashMap, keyGroupRange, i, uuid, forStStateDataTransfer);
    }

    public ForStNativeFullSnapshotStrategy(@Nonnull String str, @Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull ForStResourceContainer forStResourceContainer, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, int i, @Nonnull UUID uuid, @Nonnull ForStStateDataTransfer forStStateDataTransfer) {
        super(str, rocksDB, resourceGuard, forStResourceContainer, typeSerializer, linkedHashMap, keyGroupRange, i, uuid);
        this.stateTransfer = forStStateDataTransfer;
    }

    @Override // org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase
    protected ForStSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long j, @Nonnull List<StateMetaInfoSnapshot> list) {
        Iterator<Map.Entry<String, ForStOperationUtils.ForStKvStateInfo>> it = this.kvStateInformation.entrySet().iterator();
        while (it.hasNext()) {
            list.add(it.next().getValue().metaInfo.snapshot());
        }
        return EMPTY_PREVIOUS_SNAPSHOT;
    }

    @Override // org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase, java.lang.AutoCloseable
    public void close() {
        this.stateTransfer.close();
    }

    public void notifyCheckpointAborted(long j) {
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }

    /* renamed from: syncPrepareResources, reason: merged with bridge method [inline-methods] */
    public ForStSnapshotStrategyBase.ForStNativeSnapshotResources m60syncPrepareResources(long j) throws Exception {
        ArrayList arrayList = new ArrayList(this.kvStateInformation.size());
        ForStSnapshotStrategyBase.PreviousSnapshot snapshotMetaData = snapshotMetaData(j, arrayList);
        this.db.disableFileDeletions();
        try {
            RocksDB.LiveFiles liveFiles = this.db.getLiveFiles(true);
            List<Path> list = (List) liveFiles.files.stream().map(str -> {
                return new Path(this.resourceContainer.getDbPath(), str);
            }).filter(path -> {
                return !path.getName().equals(ForStSnapshotUtil.CURRENT_FILE_NAME);
            }).collect(Collectors.toList());
            Path path2 = list.stream().filter(path3 -> {
                return path3.getName().startsWith(ForStSnapshotUtil.MANIFEST_FILE_PREFIX);
            }).findAny().get();
            logLiveFiles(j, liveFiles.manifestFileSize, list);
            return new ForStSnapshotStrategyBase.ForStNativeSnapshotResources(arrayList, liveFiles.manifestFileSize, list, path2, snapshotMetaData, () -> {
                try {
                    this.db.enableFileDeletions(false);
                    LOG.info("Release one file deletion lock with ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.", this.backendUID, Long.valueOf(j));
                } catch (RocksDBException e) {
                    LOG.error("Enable file deletion failed, backendUID:{}, checkpointId:{}.", new Object[]{this.backendUID, Long.valueOf(j), e});
                }
            });
        } catch (Exception e) {
            LOG.error("Exception thrown when prepare snapshot resources, enable file deletion and rethrow the exception, backendUID:{}, checkpointId:{}", this.backendUID, Long.valueOf(j));
            this.db.enableFileDeletions(false);
            throw e;
        }
    }

    private void logLiveFiles(long j, long j2, List<Path> list) {
        if (LOG.isDebugEnabled()) {
            StringBuilder append = new StringBuilder("    manifestFileSize:").append(j2).append("\n");
            list.forEach(path -> {
                append.append("    file : ").append(path).append("\n");
            });
            LOG.debug("Backend:{} live files for checkpoint:{} : \n{}", new Object[]{this.backendUID, Long.valueOf(j), append});
        }
    }

    @Override // 
    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(ForStSnapshotStrategyBase.ForStNativeSnapshotResources forStNativeSnapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        return forStNativeSnapshotResources.stateMetaInfoSnapshots.isEmpty() ? closeableRegistry -> {
            return SnapshotResult.empty();
        } : new ForStNativeFullSnapshotOperation(checkpointOptions.getCheckpointType().getSharingFilesStrategy(), j, checkpointStreamFactory, forStNativeSnapshotResources);
    }
}
