package org.apache.flink.state.forst.snapshot;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.class */
public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable {

    @Nonnull
    private final String description;

    @Nonnull
    protected final RocksDB db;

    @Nonnull
    protected final ResourceGuard resourceGuard;

    @Nonnull
    protected final ForStResourceContainer resourceContainer;

    @Nonnull
    protected final TypeSerializer<K> keySerializer;

    @Nonnull
    protected final LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;

    @Nonnull
    protected final KeyGroupRange keyGroupRange;

    @Nonnegative
    protected final int keyGroupPrefixBytes;

    @Nonnull
    protected final UUID backendUID;
    private static final Logger LOG = LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);
    protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = new PreviousSnapshot(Collections.emptyList());

    /* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase$ForStNativeSnapshotResources.class */
    protected static class ForStNativeSnapshotResources implements SnapshotResources {

        @Nonnull
        protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        protected final long manifestFileSize;

        @Nonnull
        protected final List<Path> liveFiles;

        @Nonnull
        protected final String manifestFileName;

        @Nonnull
        protected final Path manifestFilePath;

        @Nonnull
        protected PreviousSnapshot previousSnapshot;

        @Nonnull
        protected final Runnable releaser;
        private final AtomicBoolean released = new AtomicBoolean(false);

        public ForStNativeSnapshotResources(@Nonnull List<StateMetaInfoSnapshot> list, long j, @Nonnull List<Path> list2, @Nonnull Path path, @Nonnull PreviousSnapshot previousSnapshot, @Nonnull Runnable runnable) {
            this.stateMetaInfoSnapshots = list;
            this.manifestFileSize = j;
            this.liveFiles = list2;
            this.manifestFilePath = path;
            this.manifestFileName = path.getName();
            this.previousSnapshot = previousSnapshot;
            this.releaser = runnable;
        }

        public void setPreviousSnapshot(@Nonnull PreviousSnapshot previousSnapshot) {
            this.previousSnapshot = previousSnapshot;
        }

        public String getCurrentFileContent() {
            return this.manifestFileName + "\n";
        }

        public void release() {
            if (this.released.compareAndSet(false, true)) {
                this.releaser.run();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase$ForStSnapshotOperation.class */
    protected abstract class ForStSnapshotOperation implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {
        protected final long checkpointId;

        @Nonnull
        protected final CheckpointStreamFactory checkpointStreamFactory;

        @Nonnull
        protected final CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();

        /* JADX INFO: Access modifiers changed from: protected */
        public ForStSnapshotOperation(long j, @Nonnull CheckpointStreamFactory checkpointStreamFactory) {
            this.checkpointId = j;
            this.checkpointStreamFactory = checkpointStreamFactory;
        }

        protected Optional<KeyedStateHandle> getLocalSnapshot(SnapshotDirectory snapshotDirectory, @Nullable StreamStateHandle streamStateHandle, List<IncrementalKeyedStateHandle.HandleAndLocalPath> list) throws IOException {
            DirectoryStateHandle completeSnapshotAndGetHandle = snapshotDirectory.completeSnapshotAndGetHandle();
            return (completeSnapshotAndGetHandle == null || streamStateHandle == null) ? Optional.empty() : Optional.of(new IncrementalLocalKeyedStateHandle(ForStSnapshotStrategyBase.this.backendUID, this.checkpointId, completeSnapshotAndGetHandle, ForStSnapshotStrategyBase.this.keyGroupRange, streamStateHandle, list));
        }
    }

    /* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase$ForStSyncSnapshotResources.class */
    protected static class ForStSyncSnapshotResources implements SnapshotResources {

        @Nonnull
        protected final SnapshotDirectory snapshotDirectory;

        @Nonnull
        protected final PreviousSnapshot previousSnapshot;

        @Nonnull
        protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;

        public ForStSyncSnapshotResources(SnapshotDirectory snapshotDirectory, PreviousSnapshot previousSnapshot, List<StateMetaInfoSnapshot> list) {
            this.snapshotDirectory = snapshotDirectory;
            this.previousSnapshot = previousSnapshot;
            this.stateMetaInfoSnapshots = list;
        }

        public void release() {
            try {
                if (this.snapshotDirectory.exists()) {
                    ForStSnapshotStrategyBase.LOG.trace("Running cleanup for local RocksDB backup directory {}.", this.snapshotDirectory);
                    if (!this.snapshotDirectory.cleanup()) {
                        ForStSnapshotStrategyBase.LOG.debug("Could not properly cleanup local RocksDB backup directory.");
                    }
                }
            } catch (IOException e) {
                ForStSnapshotStrategyBase.LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase$PreviousSnapshot.class */
    protected static class PreviousSnapshot {

        @Nonnull
        private final Map<String, StreamStateHandle> confirmedSstFiles;

        /* JADX INFO: Access modifiers changed from: protected */
        public PreviousSnapshot(@Nullable Collection<IncrementalKeyedStateHandle.HandleAndLocalPath> collection) {
            this.confirmedSstFiles = collection != null ? (Map) collection.stream().collect(Collectors.toMap((v0) -> {
                return v0.getLocalPath();
            }, (v0) -> {
                return v0.getHandle();
            })) : Collections.emptyMap();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Optional<StreamStateHandle> getUploaded(String str) {
            if (!this.confirmedSstFiles.containsKey(str)) {
                return Optional.empty();
            }
            StreamStateHandle streamStateHandle = this.confirmedSstFiles.get(str);
            return Optional.of(new PlaceholderStreamStateHandle(streamStateHandle.getStreamStateHandleID(), streamStateHandle.getStateSize(), FileMergingSnapshotManager.isFileMergingHandle(streamStateHandle)));
        }
    }

    public ForStSnapshotStrategyBase(@Nonnull String str, @Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull ForStResourceContainer forStResourceContainer, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int i, @Nonnull UUID uuid) {
        this.db = rocksDB;
        this.resourceGuard = resourceGuard;
        this.resourceContainer = forStResourceContainer;
        this.keySerializer = typeSerializer;
        this.kvStateInformation = linkedHashMap;
        this.keyGroupRange = keyGroupRange;
        this.keyGroupPrefixBytes = i;
        this.description = str;
        this.backendUID = uuid;
    }

    @Nonnull
    public String getDescription() {
        return this.description;
    }

    protected abstract PreviousSnapshot snapshotMetaData(long j, @Nonnull List<StateMetaInfoSnapshot> list);

    /* JADX INFO: Access modifiers changed from: protected */
    @Nonnull
    public SnapshotResult<StreamStateHandle> materializeMetaData(@Nonnull CloseableRegistry closeableRegistry, @Nonnull CloseableRegistry closeableRegistry2, @Nonnull List<StateMetaInfoSnapshot> list, long j, @Nonnull CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        CheckpointStreamWithResultProvider createSimpleStream = CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
        closeableRegistry.registerCloseable(createSimpleStream);
        try {
            new KeyedBackendSerializationProxy(this.keySerializer, list, false).write(new DataOutputViewStreamWrapper(createSimpleStream.getCheckpointOutputStream()));
            if (!closeableRegistry.unregisterCloseable(createSimpleStream)) {
                throw new IOException("Stream already closed and cannot return a handle.");
            }
            SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult = createSimpleStream.closeAndFinalizeCheckpointStreamResult();
            closeableRegistry2.registerCloseable(() -> {
                StateUtil.discardStateObjectQuietly(closeAndFinalizeCheckpointStreamResult);
            });
            Preconditions.checkNotNull(closeAndFinalizeCheckpointStreamResult, String.format("Backend:%s, checkpoint:%s, Metadata was not properly created.", this.backendUID, Long.valueOf(j)));
            Preconditions.checkNotNull(closeAndFinalizeCheckpointStreamResult.getJobManagerOwnedSnapshot(), String.format("Backend:%s, checkpoint:%s, Metadata for job manager was not properly created.", this.backendUID, Long.valueOf(j)));
            if (closeableRegistry.unregisterCloseable((AutoCloseable) null)) {
                IOUtils.closeQuietly((AutoCloseable) null);
            }
            return closeAndFinalizeCheckpointStreamResult;
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(createSimpleStream)) {
                IOUtils.closeQuietly(createSimpleStream);
            }
            throw th;
        }
    }

    public abstract void close();

    protected void cleanupIncompleteSnapshot(@Nonnull CloseableRegistry closeableRegistry, @Nonnull SnapshotDirectory snapshotDirectory) {
        try {
            closeableRegistry.close();
        } catch (Exception e) {
            LOG.warn("Could not properly clean tmp resources.", e);
        }
        if (snapshotDirectory.isSnapshotCompleted()) {
            try {
                DirectoryStateHandle completeSnapshotAndGetHandle = snapshotDirectory.completeSnapshotAndGetHandle();
                if (completeSnapshotAndGetHandle != null) {
                    completeSnapshotAndGetHandle.discardState();
                }
            } catch (Exception e2) {
                LOG.warn("Could not properly discard local state.", e2);
            }
        }
    }
}
