package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation;
import org.apache.flink.util.CollectionUtil;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.class */
public class DefaultOperatorStateBackendSnapshotStrategy implements SnapshotStrategy<OperatorStateHandle, DefaultOperatorStateBackendSnapshotResources> {
    private final ClassLoader userClassLoader;
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final StreamCompressionDecorator compressionDecorator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy$DefaultOperatorStateBackendSnapshotResources.class */
    public static class DefaultOperatorStateBackendSnapshotResources implements SnapshotResources {
        private final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies;
        private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies;

        DefaultOperatorStateBackendSnapshotResources(Map<String, PartitionableListState<?>> map, Map<String, BackendWritableBroadcastState<?, ?>> map2) {
            this.registeredOperatorStatesDeepCopies = map;
            this.registeredBroadcastStatesDeepCopies = map2;
        }

        public Map<String, PartitionableListState<?>> getRegisteredOperatorStatesDeepCopies() {
            return this.registeredOperatorStatesDeepCopies;
        }

        public Map<String, BackendWritableBroadcastState<?, ?>> getRegisteredBroadcastStatesDeepCopies() {
            return this.registeredBroadcastStatesDeepCopies;
        }

        @Override // org.apache.flink.runtime.state.SnapshotResources
        public void release() {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DefaultOperatorStateBackendSnapshotStrategy(ClassLoader classLoader, Map<String, PartitionableListState<?>> map, Map<String, BackendWritableBroadcastState<?, ?>> map2, StreamCompressionDecorator streamCompressionDecorator) {
        this.userClassLoader = classLoader;
        this.registeredOperatorStates = map;
        this.registeredBroadcastStates = map2;
        this.compressionDecorator = streamCompressionDecorator;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.state.SnapshotStrategy
    public DefaultOperatorStateBackendSnapshotResources syncPrepareResources(long j) {
        if (this.registeredOperatorStates.isEmpty() && this.registeredBroadcastStates.isEmpty()) {
            return new DefaultOperatorStateBackendSnapshotResources(Collections.emptyMap(), Collections.emptyMap());
        }
        HashMap newHashMapWithExpectedSize = CollectionUtil.newHashMapWithExpectedSize(this.registeredOperatorStates.size());
        HashMap newHashMapWithExpectedSize2 = CollectionUtil.newHashMapWithExpectedSize(this.registeredBroadcastStates.size());
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.userClassLoader);
        try {
            if (!this.registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> value = entry.getValue();
                    if (null != value) {
                        value = value.deepCopy();
                    }
                    newHashMapWithExpectedSize.put(entry.getKey(), value);
                }
            }
            if (!this.registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry2 : this.registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState<?, ?> value2 = entry2.getValue();
                    if (null != value2) {
                        value2 = value2.deepCopy();
                    }
                    newHashMapWithExpectedSize2.put(entry2.getKey(), value2);
                }
            }
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return new DefaultOperatorStateBackendSnapshotResources(newHashMapWithExpectedSize, newHashMapWithExpectedSize2);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategy
    public SnapshotStrategy.SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(DefaultOperatorStateBackendSnapshotResources defaultOperatorStateBackendSnapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies = defaultOperatorStateBackendSnapshotResources.getRegisteredOperatorStatesDeepCopies();
        Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies = defaultOperatorStateBackendSnapshotResources.getRegisteredBroadcastStatesDeepCopies();
        if (!registeredBroadcastStatesDeepCopies.isEmpty() || !registeredOperatorStatesDeepCopies.isEmpty()) {
            return closeableRegistry -> {
                CheckpointStateOutputStream createCheckpointStateOutputStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                closeableRegistry.registerCloseable((AutoCloseable) createCheckpointStateOutputStream);
                ArrayList arrayList = new ArrayList(registeredOperatorStatesDeepCopies.size());
                Iterator it = registeredOperatorStatesDeepCopies.entrySet().iterator();
                while (it.hasNext()) {
                    arrayList.add(((PartitionableListState) ((Map.Entry) it.next()).getValue()).getStateMetaInfo().snapshot());
                }
                ArrayList arrayList2 = new ArrayList(registeredBroadcastStatesDeepCopies.size());
                Iterator it2 = registeredBroadcastStatesDeepCopies.entrySet().iterator();
                while (it2.hasNext()) {
                    arrayList2.add(((BackendWritableBroadcastState) ((Map.Entry) it2.next()).getValue()).getStateMetaInfo().snapshot());
                }
                new OperatorBackendSerializationProxy(arrayList, arrayList2, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.compressionDecorator)).write(new DataOutputViewStreamWrapper(createCheckpointStateOutputStream));
                HashMap newHashMapWithExpectedSize = CollectionUtil.newHashMapWithExpectedSize(registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size());
                CompressibleFSDataOutputStream compressibleFSDataOutputStream = new CompressibleFSDataOutputStream(createCheckpointStateOutputStream, this.compressionDecorator);
                try {
                    for (Map.Entry entry : registeredOperatorStatesDeepCopies.entrySet()) {
                        PartitionableListState partitionableListState = (PartitionableListState) entry.getValue();
                        newHashMapWithExpectedSize.put((String) entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionableListState.write(compressibleFSDataOutputStream), partitionableListState.getStateMetaInfo().getAssignmentMode()));
                    }
                    for (Map.Entry entry2 : registeredBroadcastStatesDeepCopies.entrySet()) {
                        BackendWritableBroadcastState backendWritableBroadcastState = (BackendWritableBroadcastState) entry2.getValue();
                        newHashMapWithExpectedSize.put((String) entry2.getKey(), new OperatorStateHandle.StateMetaInfo(new long[]{backendWritableBroadcastState.write(compressibleFSDataOutputStream)}, backendWritableBroadcastState.getStateMetaInfo().getAssignmentMode()));
                    }
                    compressibleFSDataOutputStream.close();
                    if (!closeableRegistry.unregisterCloseable((AutoCloseable) createCheckpointStateOutputStream)) {
                        throw new IOException("Stream was already unregistered.");
                    }
                    StreamStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
                    return SnapshotResult.of(closeAndGetHandle != null ? checkpointStreamFactory instanceof FsMergingCheckpointStorageLocation ? new FileMergingOperatorStreamStateHandle(((FsMergingCheckpointStorageLocation) checkpointStreamFactory).getExclusiveStateHandle(), ((FsMergingCheckpointStorageLocation) checkpointStreamFactory).getSharedStateHandle(), newHashMapWithExpectedSize, closeAndGetHandle) : new OperatorStreamStateHandle(newHashMapWithExpectedSize, closeAndGetHandle) : null);
                } catch (Throwable th) {
                    try {
                        compressibleFSDataOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            };
        }
        if (!(checkpointStreamFactory instanceof FsMergingCheckpointStorageLocation)) {
            return closeableRegistry2 -> {
                return SnapshotResult.empty();
            };
        }
        FsMergingCheckpointStorageLocation fsMergingCheckpointStorageLocation = (FsMergingCheckpointStorageLocation) checkpointStreamFactory;
        return closeableRegistry3 -> {
            return SnapshotResult.of(EmptyFileMergingOperatorStreamStateHandle.create(fsMergingCheckpointStorageLocation.getExclusiveStateHandle(), fsMergingCheckpointStorageLocation.getSharedStateHandle()));
        };
    }
}
