package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RestoreOperation;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapRestoreOperation.class */
public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapRestoreOperation.class);
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
    private final CloseableRegistry cancelStreamRegistry;

    @Nonnull
    private final KeyGroupRange keyGroupRange;
    private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HeapRestoreOperation(@Nonnull Collection<KeyedStateHandle> collection, StateSerializerProvider<K> stateSerializerProvider, ClassLoader classLoader, Map<String, StateTable<K, ?, ?>> map, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> map2, CloseableRegistry closeableRegistry, HeapPriorityQueueSetFactory heapPriorityQueueSetFactory, @Nonnull KeyGroupRange keyGroupRange, int i, StateTableFactory<K> stateTableFactory, InternalKeyContext<K> internalKeyContext) {
        this.restoreStateHandles = collection;
        this.keySerializerProvider = stateSerializerProvider;
        this.userCodeClassLoader = classLoader;
        this.registeredKVStates = map;
        this.registeredPQStates = map2;
        this.cancelStreamRegistry = closeableRegistry;
        this.keyGroupRange = keyGroupRange;
        this.heapMetaInfoRestoreOperation = new HeapMetaInfoRestoreOperation<>(stateSerializerProvider, heapPriorityQueueSetFactory, keyGroupRange, i, stateTableFactory, internalKeyContext);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.state.RestoreOperation
    public Void restore() throws Exception {
        this.registeredKVStates.clear();
        this.registeredPQStates.clear();
        boolean z = false;
        for (KeyedStateHandle keyedStateHandle : this.restoreStateHandles) {
            if (keyedStateHandle != null) {
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw StateUtil.unexpectedStateHandleException((Class<? extends StateObject>) KeyGroupsStateHandle.class, (Class<? extends StateObject>) keyedStateHandle.getClass());
                }
                LOG.info("Starting to restore from state handle: {}.", keyedStateHandle);
                KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
                FSDataInputStream openInputStream = keyGroupsStateHandle.openInputStream();
                this.cancelStreamRegistry.registerCloseable(openInputStream);
                try {
                    DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                    KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                    keyedBackendSerializationProxy.read(dataInputViewStreamWrapper);
                    if (!z) {
                        TypeSerializer<K> currentSchemaSerializer = this.keySerializerProvider.currentSchemaSerializer();
                        TypeSerializerSchemaCompatibility<K> previousSerializerSnapshotForRestoredState = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(keyedBackendSerializationProxy.getKeySerializerSnapshot());
                        if (previousSerializerSnapshotForRestoredState.isCompatibleAfterMigration() || previousSerializerSnapshotForRestoredState.isIncompatible()) {
                            throw new StateMigrationException("The new key serializer (" + currentSchemaSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
                        }
                        z = true;
                    }
                    List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = keyedBackendSerializationProxy.getStateMetaInfoSnapshots();
                    readStateHandleStateData(openInputStream, dataInputViewStreamWrapper, keyGroupsStateHandle.getGroupRangeOffsets(), this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo(stateMetaInfoSnapshots, this.registeredKVStates, this.registeredPQStates), stateMetaInfoSnapshots.size(), keyedBackendSerializationProxy.getReadVersion(), keyedBackendSerializationProxy.isUsingKeyGroupCompression());
                    LOG.info("Finished restoring from state handle: {}.", keyedStateHandle);
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                } catch (Throwable th) {
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                    throw th;
                }
            }
        }
        return null;
    }

    private void readStateHandleStateData(FSDataInputStream fSDataInputStream, DataInputViewStreamWrapper dataInputViewStreamWrapper, KeyGroupRangeOffsets keyGroupRangeOffsets, Map<Integer, StateMetaInfoSnapshot> map, int i, int i2, boolean z) throws IOException {
        StreamCompressionDecorator streamCompressionDecorator = z ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        Iterator<Tuple2<Integer, Long>> it = keyGroupRangeOffsets.iterator();
        while (it.hasNext()) {
            Tuple2<Integer, Long> next = it.next();
            int intValue = ((Integer) next.f0).intValue();
            long longValue = ((Long) next.f1).longValue();
            if (this.keyGroupRange.contains(intValue)) {
                fSDataInputStream.seek(longValue);
                Preconditions.checkState(dataInputViewStreamWrapper.readInt() == intValue, "Unexpected key-group in restore.");
                InputStream decorateWithCompression = streamCompressionDecorator.decorateWithCompression((InputStream) fSDataInputStream);
                try {
                    readKeyGroupStateData(decorateWithCompression, map, intValue, i, i2);
                    if (decorateWithCompression != null) {
                        decorateWithCompression.close();
                    }
                } catch (Throwable th) {
                    if (decorateWithCompression != null) {
                        try {
                            decorateWithCompression.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } else {
                LOG.debug("Key group {} doesn't belong to this backend with key group range: {}", Integer.valueOf(intValue), this.keyGroupRange);
            }
        }
    }

    private void readKeyGroupStateData(InputStream inputStream, Map<Integer, StateMetaInfoSnapshot> map, int i, int i2, int i3) throws IOException {
        StateTable<K, ?, ?> stateTable;
        DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(inputStream);
        for (int i4 = 0; i4 < i2; i4++) {
            StateMetaInfoSnapshot stateMetaInfoSnapshot = map.get(Integer.valueOf(dataInputViewStreamWrapper.readShort()));
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE:
                    stateTable = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    break;
                case PRIORITY_QUEUE:
                    stateTable = (StateTable<K, ?, ?>) this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    break;
                default:
                    throw new IllegalStateException("Unexpected state type: " + stateMetaInfoSnapshot.getBackendStateType() + ScopeFormat.SCOPE_SEPARATOR);
            }
            stateTable.keyGroupReader(i3).readMappingsInKeyGroup(dataInputViewStreamWrapper, i);
        }
    }
}
