package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.function.SupplierWithException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.class */
public class HeapSnapshotStrategy<K> implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
    private final StreamCompressionDecorator keyGroupCompressionDecorator;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final KeyGroupRange keyGroupRange;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final int totalKeyGroups;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HeapSnapshotStrategy(Map<String, StateTable<K, ?, ?>> map, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> map2, StreamCompressionDecorator streamCompressionDecorator, LocalRecoveryConfig localRecoveryConfig, KeyGroupRange keyGroupRange, StateSerializerProvider<K> stateSerializerProvider, int i) {
        this.registeredKVStates = map;
        this.registeredPQStates = map2;
        this.keyGroupCompressionDecorator = streamCompressionDecorator;
        this.localRecoveryConfig = localRecoveryConfig;
        this.keyGroupRange = keyGroupRange;
        this.keySerializerProvider = stateSerializerProvider;
        this.totalKeyGroups = i;
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategy
    public HeapSnapshotResources<K> syncPrepareResources(long j) {
        return HeapSnapshotResources.create(this.registeredKVStates, this.registeredPQStates, this.keyGroupCompressionDecorator, this.keyGroupRange, getKeySerializer(), this.totalKeyGroups);
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategy
    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(HeapSnapshotResources<K> heapSnapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        List<StateMetaInfoSnapshot> metaInfoSnapshots = heapSnapshotResources.getMetaInfoSnapshots();
        if (metaInfoSnapshots.isEmpty()) {
            return closeableRegistry -> {
                return SnapshotResult.empty();
            };
        }
        KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(heapSnapshotResources.getKeySerializer(), metaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.keyGroupCompressionDecorator));
        SupplierWithException supplierWithException = (!this.localRecoveryConfig.isLocalBackupEnabled() || checkpointOptions.getCheckpointType().isSavepoint()) ? () -> {
            return CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
        } : () -> {
            return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()));
        };
        return closeableRegistry2 -> {
            Map<StateUID, Integer> stateNamesToId = heapSnapshotResources.getStateNamesToId();
            Map<StateUID, StateSnapshot> cowStateStableSnapshots = heapSnapshotResources.getCowStateStableSnapshots();
            CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = (CheckpointStreamWithResultProvider) supplierWithException.get();
            closeableRegistry2.registerCloseable((AutoCloseable) checkpointStreamWithResultProvider);
            CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream();
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(checkpointOutputStream);
            keyedBackendSerializationProxy.write(dataOutputViewStreamWrapper);
            long[] jArr = new long[this.keyGroupRange.getNumberOfKeyGroups()];
            for (int i = 0; i < this.keyGroupRange.getNumberOfKeyGroups(); i++) {
                int keyGroupId = this.keyGroupRange.getKeyGroupId(i);
                jArr[i] = checkpointOutputStream.getPos();
                dataOutputViewStreamWrapper.writeInt(keyGroupId);
                for (Map.Entry<StateUID, StateSnapshot> entry : cowStateStableSnapshots.entrySet()) {
                    StateSnapshot.StateKeyGroupWriter keyGroupWriter = entry.getValue().getKeyGroupWriter();
                    OutputStream decorateWithCompression = this.keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
                    try {
                        DataOutputViewStreamWrapper dataOutputViewStreamWrapper2 = new DataOutputViewStreamWrapper(decorateWithCompression);
                        dataOutputViewStreamWrapper2.writeShort(stateNamesToId.get(entry.getKey()).intValue());
                        keyGroupWriter.writeStateInKeyGroup(dataOutputViewStreamWrapper2, keyGroupId);
                        if (decorateWithCompression != null) {
                            decorateWithCompression.close();
                        }
                    } catch (Throwable th) {
                        if (decorateWithCompression != null) {
                            try {
                                decorateWithCompression.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
            }
            if (!closeableRegistry2.unregisterCloseable((AutoCloseable) checkpointStreamWithResultProvider)) {
                throw new IOException("Stream already unregistered.");
            }
            return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), new KeyGroupRangeOffsets(this.keyGroupRange, jArr), KeyGroupsStateHandle::new);
        };
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializerProvider.currentSchemaSerializer();
    }
}
