/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksIncrementalSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot";
    @Nonnull
    private final File instanceBasePath;
    @Nonnull
    private final UUID backendUID;
    @Nonnull
    private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
    private long lastCompletedCheckpointId;
    private final RocksDBStateUploader stateUploader;
    private final String localDirectoryName;

    public RocksIncrementalSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard rocksDBResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull CloseableRegistry cancelStreamRegistry, @Nonnull File instanceBasePath, @Nonnull UUID backendUID, @Nonnull SortedMap<Long, Set<StateHandleID>> materializedSstFiles, @Nonnull RocksDBStateUploader rocksDBStateUploader, long lastCompletedCheckpointId) {
        super(DESCRIPTION, db, rocksDBResourceGuard, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig, cancelStreamRegistry);
        this.instanceBasePath = instanceBasePath;
        this.backendUID = backendUID;
        this.materializedSstFiles = materializedSstFiles;
        this.lastCompletedCheckpointId = lastCompletedCheckpointId;
        this.stateUploader = rocksDBStateUploader;
        this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
    }

    @Override
    @Nonnull
    protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(long checkpointId, long checkpointTimestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        SnapshotDirectory snapshotDirectory = this.prepareLocalSnapshotDirectory(checkpointId);
        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", (Object)snapshotDirectory);
        ArrayList<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(this.kvStateInformation.size());
        Set<StateHandleID> baseSstFiles = this.snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
        this.takeDBNativeCheckpoint(snapshotDirectory);
        RocksDBIncrementalSnapshotOperation snapshotOperation = new RocksDBIncrementalSnapshotOperation(checkpointId, checkpointStreamFactory, snapshotDirectory, baseSstFiles, stateMetaInfoSnapshots);
        return snapshotOperation.toAsyncSnapshotFutureTask(this.cancelStreamRegistry);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long completedCheckpointId) {
        SortedMap<Long, Set<StateHandleID>> sortedMap = this.materializedSstFiles;
        synchronized (sortedMap) {
            if (completedCheckpointId > this.lastCompletedCheckpointId) {
                this.materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
                this.lastCompletedCheckpointId = completedCheckpointId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointAborted(long abortedCheckpointId) {
        SortedMap<Long, Set<StateHandleID>> sortedMap = this.materializedSstFiles;
        synchronized (sortedMap) {
            this.materializedSstFiles.keySet().remove(abortedCheckpointId);
        }
    }

    @Override
    public void close() {
        this.stateUploader.close();
    }

    @Nonnull
    private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException {
        if (this.localRecoveryConfig.isLocalRecoveryEnabled()) {
            LocalRecoveryDirectoryProvider directoryProvider = this.localRecoveryConfig.getLocalStateDirectoryProvider();
            File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
            if (!directory.exists() && !directory.mkdirs()) {
                throw new IOException("Local state base directory for checkpoint " + checkpointId + " does not exist and could not be created: " + directory);
            }
            File rdbSnapshotDir = new File(directory, this.localDirectoryName);
            if (rdbSnapshotDir.exists()) {
                FileUtils.deleteDirectory((File)rdbSnapshotDir);
            }
            Path path = rdbSnapshotDir.toPath();
            try {
                return SnapshotDirectory.permanent((Path)path);
            }
            catch (IOException ex) {
                try {
                    FileUtils.deleteDirectory((File)directory);
                }
                catch (IOException delEx) {
                    ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)delEx, (Throwable)ex);
                }
                throw ex;
            }
        }
        File snapshotDir = new File(this.instanceBasePath, "chk-" + checkpointId);
        return SnapshotDirectory.temporary((File)snapshotDir);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<StateHandleID> snapshotMetaData(long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
        Set baseSstFiles;
        long lastCompletedCheckpoint;
        SortedMap<Long, Set<StateHandleID>> sortedMap = this.materializedSstFiles;
        synchronized (sortedMap) {
            lastCompletedCheckpoint = this.lastCompletedCheckpointId;
            baseSstFiles = (Set)this.materializedSstFiles.get(lastCompletedCheckpoint);
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) files as base: {}.", new Object[]{checkpointId, lastCompletedCheckpoint, baseSstFiles});
        for (Map.Entry entry : this.kvStateInformation.entrySet()) {
            stateMetaInfoSnapshots.add(((RocksDBKeyedStateBackend.RocksDbKvStateInfo)entry.getValue()).metaInfo.snapshot());
        }
        return baseSstFiles;
    }

    private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory) throws Exception {
        try (ResourceGuard.Lease ignored = this.rocksDBResourceGuard.acquireResource();
             Checkpoint checkpoint = Checkpoint.create((RocksDB)this.db);){
            checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
        }
        catch (Exception ex) {
            try {
                outputDirectory.cleanup();
            }
            catch (IOException cleanupEx) {
                ex = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)cleanupEx, (Throwable)ex);
            }
            throw ex;
        }
    }

    private final class RocksDBIncrementalSnapshotOperation
    extends AsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>> {
        private final long checkpointId;
        @Nonnull
        private final CheckpointStreamFactory checkpointStreamFactory;
        @Nonnull
        private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        @Nonnull
        private final SnapshotDirectory localBackupDirectory;
        @Nullable
        private final Set<StateHandleID> baseSstFiles;

        private RocksDBIncrementalSnapshotOperation(@Nonnull long checkpointId, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nullable SnapshotDirectory localBackupDirectory, @Nonnull Set<StateHandleID> baseSstFiles, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            this.checkpointStreamFactory = checkpointStreamFactory;
            this.baseSstFiles = baseSstFiles;
            this.checkpointId = checkpointId;
            this.localBackupDirectory = localBackupDirectory;
            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
            boolean completed = false;
            SnapshotResult<StreamStateHandle> metaStateHandle = null;
            HashMap<StateHandleID, StreamStateHandle> sstFiles = new HashMap<StateHandleID, StreamStateHandle>();
            HashMap<StateHandleID, StreamStateHandle> miscFiles = new HashMap<StateHandleID, StreamStateHandle>();
            try {
                SnapshotResult snapshotResult;
                metaStateHandle = this.materializeMetaData();
                Preconditions.checkNotNull(metaStateHandle, (String)"Metadata was not properly created.");
                Preconditions.checkNotNull((Object)metaStateHandle.getJobManagerOwnedSnapshot(), (String)"Metadata for job manager was not properly created.");
                this.uploadSstFiles(sstFiles, miscFiles);
                SortedMap sortedMap = RocksIncrementalSnapshotStrategy.this.materializedSstFiles;
                synchronized (sortedMap) {
                    RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());
                }
                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());
                DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
                    IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());
                    snapshotResult = SnapshotResult.withLocalState((StateObject)jmIncrementalKeyedStateHandle, (StateObject)localDirKeyedStateHandle);
                } else {
                    snapshotResult = SnapshotResult.of((StateObject)jmIncrementalKeyedStateHandle);
                }
                completed = true;
                SnapshotResult snapshotResult2 = snapshotResult;
                return snapshotResult2;
            }
            finally {
                if (!completed) {
                    ArrayList<StateObject> statesToDiscard = new ArrayList<StateObject>(1 + miscFiles.size() + sstFiles.size());
                    statesToDiscard.add((StateObject)metaStateHandle);
                    statesToDiscard.addAll(miscFiles.values());
                    statesToDiscard.addAll(sstFiles.values());
                    this.cleanupIncompleteSnapshot(statesToDiscard);
                }
            }
        }

        protected void cleanupProvidedResources() {
            try {
                if (this.localBackupDirectory.exists()) {
                    LOG.trace("Running cleanup for local RocksDB backup directory {}.", (Object)this.localBackupDirectory);
                    boolean cleanupOk = this.localBackupDirectory.cleanup();
                    if (!cleanupOk) {
                        LOG.debug("Could not properly cleanup local RocksDB backup directory.");
                    }
                }
            }
            catch (IOException e) {
                LOG.warn("Could not properly cleanup local RocksDB backup directory.", (Throwable)e);
            }
        }

        protected void logAsyncSnapshotComplete(long startTime) {
            RocksIncrementalSnapshotStrategy.this.logAsyncCompleted(this.checkpointStreamFactory, startTime);
        }

        private void cleanupIncompleteSnapshot(@Nonnull List<StateObject> statesToDiscard) {
            try {
                StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
            }
            catch (Exception e) {
                LOG.warn("Could not properly discard states.", (Throwable)e);
            }
            if (this.localBackupDirectory.isSnapshotCompleted()) {
                try {
                    DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                    if (directoryStateHandle != null) {
                        directoryStateHandle.discardState();
                    }
                }
                catch (Exception e) {
                    LOG.warn("Could not properly discard local state.", (Throwable)e);
                }
            }
        }

        private void uploadSstFiles(@Nonnull Map<StateHandleID, StreamStateHandle> sstFiles, @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles) throws Exception {
            Preconditions.checkState((boolean)this.localBackupDirectory.exists());
            HashMap<StateHandleID, Path> sstFilePaths = new HashMap<StateHandleID, Path>();
            HashMap<StateHandleID, Path> miscFilePaths = new HashMap<StateHandleID, Path>();
            Path[] files = this.localBackupDirectory.listDirectory();
            if (files != null) {
                this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
                sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, this.snapshotCloseableRegistry));
                miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, this.snapshotCloseableRegistry));
            }
        }

        private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
            for (Path filePath : files) {
                String fileName = filePath.getFileName().toString();
                StateHandleID stateHandleID = new StateHandleID(fileName);
                if (fileName.endsWith(".sst")) {
                    boolean existsAlready;
                    boolean bl = existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
                    if (existsAlready) {
                        sstFiles.put(stateHandleID, (StreamStateHandle)new PlaceholderStreamStateHandle());
                        continue;
                    }
                    sstFilePaths.put(stateHandleID, filePath);
                    continue;
                }
                miscFilePaths.put(stateHandleID, filePath);
            }
        }

        @Nonnull
        private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
            CheckpointStreamWithResultProvider streamWithResultProvider = RocksIncrementalSnapshotStrategy.this.localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream((long)this.checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory, (LocalRecoveryDirectoryProvider)RocksIncrementalSnapshotStrategy.this.localRecoveryConfig.getLocalStateDirectoryProvider()) : CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory);
            this.snapshotCloseableRegistry.registerCloseable((Closeable)streamWithResultProvider);
            try {
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(RocksIncrementalSnapshotStrategy.this.keySerializer, this.stateMetaInfoSnapshots, false);
                DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)streamWithResultProvider.getCheckpointOutputStream());
                serializationProxy.write((DataOutputView)out);
                if (this.snapshotCloseableRegistry.unregisterCloseable((Closeable)streamWithResultProvider)) {
                    SnapshotResult result = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                    streamWithResultProvider = null;
                    SnapshotResult snapshotResult = result;
                    return snapshotResult;
                }
                throw new IOException("Stream already closed and cannot return a handle.");
            }
            finally {
                if (streamWithResultProvider != null && this.snapshotCloseableRegistry.unregisterCloseable((Closeable)streamWithResultProvider)) {
                    IOUtils.closeQuietly((AutoCloseable)streamWithResultProvider);
                }
            }
        }
    }
}

