package org.apache.flink.state.forst.snapshot;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Stream;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.class */
public class ForStIncrementalSnapshotStrategy<K> extends ForStNativeFullSnapshotStrategy<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ForStIncrementalSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous incremental ForSt snapshot";

    @Nonnull
    private final SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedSstFiles;
    private long lastCompletedCheckpointId;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy = new int[SnapshotType.SharingFilesStrategy.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[SnapshotType.SharingFilesStrategy.FORWARD_BACKWARD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[SnapshotType.SharingFilesStrategy.NO_SHARING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[SnapshotType.SharingFilesStrategy.FORWARD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy$ForStIncrementalSnapshotOperation.class */
    public final class ForStIncrementalSnapshotOperation extends ForStSnapshotStrategyBase<K, ForStSnapshotStrategyBase.ForStNativeSnapshotResources>.ForStSnapshotOperation {

        @Nonnull
        private final SnapshotType.SharingFilesStrategy sharingFilesStrategy;

        @Nonnull
        private final ForStSnapshotStrategyBase.ForStNativeSnapshotResources snapshotResources;

        private ForStIncrementalSnapshotOperation(long j, @Nonnull ForStSnapshotStrategyBase.ForStNativeSnapshotResources forStNativeSnapshotResources, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull SnapshotType.SharingFilesStrategy sharingFilesStrategy) {
            super(j, checkpointStreamFactory);
            this.sharingFilesStrategy = sharingFilesStrategy;
            this.snapshotResources = forStNativeSnapshotResources;
        }

        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry closeableRegistry) throws Exception {
            boolean z = false;
            ArrayList arrayList = new ArrayList();
            try {
                SnapshotResult<StreamStateHandle> materializeMetaData = ForStIncrementalSnapshotStrategy.this.materializeMetaData(closeableRegistry, this.tmpResourcesRegistry, this.snapshotResources.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                z = true;
                SnapshotResult<KeyedStateHandle> of = SnapshotResult.of(new IncrementalRemoteKeyedStateHandle(ForStIncrementalSnapshotStrategy.this.backendUID, ForStIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, arrayList2, arrayList3, materializeMetaData.getJobManagerOwnedSnapshot(), materializeMetaData.getStateSize() + transferSnapshotFiles(arrayList2, arrayList3, closeableRegistry, this.tmpResourcesRegistry, arrayList)));
                this.snapshotResources.release();
                if (1 == 0) {
                    try {
                        this.tmpResourcesRegistry.close();
                    } catch (Exception e) {
                        ForStIncrementalSnapshotStrategy.LOG.warn("Could not properly clean tmp resources.", e);
                    }
                } else {
                    this.checkpointStreamFactory.reusePreviousStateHandle(arrayList);
                }
                return of;
            } catch (Throwable th) {
                this.snapshotResources.release();
                if (z) {
                    this.checkpointStreamFactory.reusePreviousStateHandle(arrayList);
                } else {
                    try {
                        this.tmpResourcesRegistry.close();
                    } catch (Exception e2) {
                        ForStIncrementalSnapshotStrategy.LOG.warn("Could not properly clean tmp resources.", e2);
                    }
                }
                throw th;
            }
        }

        private long transferSnapshotFiles(@Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> list, @Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> list2, @Nonnull CloseableRegistry closeableRegistry, @Nonnull CloseableRegistry closeableRegistry2, @Nonnull List<StreamStateHandle> list3) throws Exception {
            Preconditions.checkNotNull(this.snapshotResources.liveFiles, "liveFiles were not properly created.");
            if (this.snapshotResources.liveFiles.isEmpty()) {
                return 0L;
            }
            Tuple4<List<IncrementalKeyedStateHandle.HandleAndLocalPath>, List<Path>, List<Path>, Path> classifyFiles = classifyFiles();
            list.addAll((Collection) classifyFiles.f0);
            Stream map = list.stream().map((v0) -> {
                return v0.getHandle();
            });
            Objects.requireNonNull(list3);
            map.forEach((v1) -> {
                r1.add(v1);
            });
            CheckpointedStateScope checkpointedStateScope = this.sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED;
            List<IncrementalKeyedStateHandle.HandleAndLocalPath> transferFilesToCheckpointFs = ForStIncrementalSnapshotStrategy.this.stateTransfer.transferFilesToCheckpointFs(this.sharingFilesStrategy, (List) classifyFiles.f1, this.checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
            list.addAll(transferFilesToCheckpointFs);
            long sum = 0 + transferFilesToCheckpointFs.stream().mapToLong((v0) -> {
                return v0.getStateSize();
            }).sum();
            List<IncrementalKeyedStateHandle.HandleAndLocalPath> transferFilesToCheckpointFs2 = ForStIncrementalSnapshotStrategy.this.stateTransfer.transferFilesToCheckpointFs(this.sharingFilesStrategy, (List) classifyFiles.f2, this.checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
            list2.addAll(transferFilesToCheckpointFs2);
            long sum2 = sum + transferFilesToCheckpointFs2.stream().mapToLong((v0) -> {
                return v0.getStateSize();
            }).sum();
            IncrementalKeyedStateHandle.HandleAndLocalPath transferFileToCheckpointFs = ForStIncrementalSnapshotStrategy.this.stateTransfer.transferFileToCheckpointFs(this.sharingFilesStrategy, (Path) classifyFiles.f3, this.snapshotResources.manifestFileSize, this.checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
            list2.add(transferFileToCheckpointFs);
            long stateSize = sum2 + transferFileToCheckpointFs.getStateSize();
            IncrementalKeyedStateHandle.HandleAndLocalPath writeFileToCheckpointFs = ForStIncrementalSnapshotStrategy.this.stateTransfer.writeFileToCheckpointFs(ForStSnapshotUtil.CURRENT_FILE_NAME, this.snapshotResources.getCurrentFileContent(), this.checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
            list2.add(writeFileToCheckpointFs);
            long stateSize2 = stateSize + writeFileToCheckpointFs.getStateSize();
            recordReusableHandles(list);
            return stateSize2;
        }

        private void recordReusableHandles(List<IncrementalKeyedStateHandle.HandleAndLocalPath> list) {
            synchronized (ForStIncrementalSnapshotStrategy.this.uploadedSstFiles) {
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[this.sharingFilesStrategy.ordinal()]) {
                    case 1:
                    case 3:
                        ForStIncrementalSnapshotStrategy.this.uploadedSstFiles.put(Long.valueOf(this.checkpointId), Collections.unmodifiableList(list));
                        break;
                    case 2:
                        break;
                    default:
                        throw new IllegalArgumentException("Unsupported sharing files strategy: " + this.sharingFilesStrategy);
                }
            }
        }

        private Tuple4<List<IncrementalKeyedStateHandle.HandleAndLocalPath>, List<Path>, List<Path>, Path> classifyFiles() {
            int size = this.snapshotResources.liveFiles.size();
            ArrayList arrayList = new ArrayList(size);
            ArrayList arrayList2 = new ArrayList(size);
            ArrayList arrayList3 = new ArrayList(size);
            Path path = null;
            for (Path path2 : this.snapshotResources.liveFiles) {
                String name = path2.getName();
                if (name.equals(this.snapshotResources.manifestFileName)) {
                    path = path2;
                } else if (name.endsWith(".sst")) {
                    Optional<StreamStateHandle> uploaded = this.snapshotResources.previousSnapshot.getUploaded(name);
                    if (uploaded.isPresent() && this.checkpointStreamFactory.couldReuseStateHandle(uploaded.get())) {
                        arrayList.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of(uploaded.get(), name));
                    } else {
                        arrayList2.add(path2);
                    }
                } else {
                    arrayList3.add(path2);
                }
            }
            return Tuple4.of(arrayList, arrayList2, arrayList3, path);
        }
    }

    public ForStIncrementalSnapshotStrategy(@Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull ForStResourceContainer forStResourceContainer, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int i, @Nonnull UUID uuid, @Nonnull SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap, @Nonnull ForStStateDataTransfer forStStateDataTransfer, long j) {
        super(DESCRIPTION, rocksDB, resourceGuard, forStResourceContainer, typeSerializer, linkedHashMap, keyGroupRange, i, uuid, forStStateDataTransfer);
        this.uploadedSstFiles = new TreeMap((SortedMap) sortedMap);
        this.lastCompletedCheckpointId = j;
    }

    @Override // org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy
    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(ForStSnapshotStrategyBase.ForStNativeSnapshotResources forStNativeSnapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        if (forStNativeSnapshotResources.stateMetaInfoSnapshots.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Asynchronous ForSt snapshot performed on empty keyed state at {}. Returning null.", Long.valueOf(j2));
            }
            return closeableRegistry -> {
                return SnapshotResult.empty();
            };
        }
        SnapshotType.SharingFilesStrategy sharingFilesStrategy = checkpointOptions.getCheckpointType().getSharingFilesStrategy();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[sharingFilesStrategy.ordinal()]) {
            case 1:
                break;
            case 2:
                forStNativeSnapshotResources.setPreviousSnapshot(EMPTY_PREVIOUS_SNAPSHOT);
                break;
            case 3:
            default:
                throw new IllegalArgumentException(String.format("Unsupported sharing files strategy for %s : %s", getClass().getName(), sharingFilesStrategy));
        }
        return new ForStIncrementalSnapshotOperation(j, forStNativeSnapshotResources, checkpointStreamFactory, sharingFilesStrategy);
    }

    @Override // org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy
    public void notifyCheckpointComplete(long j) {
        synchronized (this.uploadedSstFiles) {
            LOG.info("Backend:{} checkpoint:{} complete.", this.backendUID, Long.valueOf(j));
            if (j > this.lastCompletedCheckpointId && this.uploadedSstFiles.containsKey(Long.valueOf(j))) {
                this.uploadedSstFiles.keySet().removeIf(l -> {
                    return l.longValue() < j;
                });
                this.lastCompletedCheckpointId = j;
            }
        }
    }

    @Override // org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy
    public void notifyCheckpointAborted(long j) {
        synchronized (this.uploadedSstFiles) {
            LOG.info("Backend:{} checkpoint:{} aborted.", this.backendUID, Long.valueOf(j));
            this.uploadedSstFiles.keySet().remove(Long.valueOf(j));
        }
    }

    @Override // org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy, org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase, java.lang.AutoCloseable
    public void close() {
        this.stateTransfer.close();
    }

    @Override // org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy, org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase
    protected ForStSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long j, @Nonnull List<StateMetaInfoSnapshot> list) {
        long j2;
        Collection<IncrementalKeyedStateHandle.HandleAndLocalPath> collection;
        synchronized (this.uploadedSstFiles) {
            j2 = this.lastCompletedCheckpointId;
            collection = this.uploadedSstFiles.get(Long.valueOf(j2));
            LOG.trace("Use confirmed SST files for checkpoint {}: {}", Long.valueOf(j), collection);
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) confirmed files as base: {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), collection});
        Iterator<Map.Entry<String, ForStOperationUtils.ForStKvStateInfo>> it = this.kvStateInformation.entrySet().iterator();
        while (it.hasNext()) {
            list.add(it.next().getValue().metaInfo.snapshot());
        }
        return new ForStSnapshotStrategyBase.PreviousSnapshot(collection);
    }
}
