package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
import org.apache.flink.runtime.state.v2.adaptor.OperatorListStateAdaptor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend.class */
public class DefaultOperatorStateBackend implements OperatorStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final CloseableRegistry closeStreamOnCancelRegistry;
    private final JavaSerializer<Serializable> deprecatedDefaultJavaSerializer = new JavaSerializer<>();
    private final ExecutionConfig executionConfig;
    private final Map<String, PartitionableListState<?>> accessedStatesByName;
    private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;
    private final SnapshotStrategyRunner<OperatorStateHandle, ?> snapshotStrategyRunner;

    public DefaultOperatorStateBackend(ExecutionConfig executionConfig, CloseableRegistry closeableRegistry, Map<String, PartitionableListState<?>> map, Map<String, BackendWritableBroadcastState<?, ?>> map2, Map<String, PartitionableListState<?>> map3, Map<String, BackendWritableBroadcastState<?, ?>> map4, SnapshotStrategyRunner<OperatorStateHandle, ?> snapshotStrategyRunner) {
        this.closeStreamOnCancelRegistry = closeableRegistry;
        this.executionConfig = executionConfig;
        this.registeredOperatorStates = map;
        this.registeredBroadcastStates = map2;
        this.accessedStatesByName = map3;
        this.accessedBroadcastStatesByName = map4;
        this.snapshotStrategyRunner = snapshotStrategyRunner;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public Set<String> getRegisteredStateNames() {
        return this.registeredOperatorStates.keySet();
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public Set<String> getRegisteredBroadcastStateNames() {
        return this.registeredBroadcastStates.keySet();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override // org.apache.flink.runtime.state.OperatorStateBackend, org.apache.flink.util.Disposable
    public void dispose() {
        IOUtils.closeQuietly(this.closeStreamOnCancelRegistry);
        this.registeredOperatorStates.clear();
        this.registeredBroadcastStates.clear();
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> mapStateDescriptor) throws StateMigrationException {
        Preconditions.checkNotNull(mapStateDescriptor);
        String str = (String) Preconditions.checkNotNull(mapStateDescriptor.getName());
        BackendWritableBroadcastState<?, ?> backendWritableBroadcastState = this.accessedBroadcastStatesByName.get(str);
        if (backendWritableBroadcastState != null) {
            checkStateNameAndMode(backendWritableBroadcastState.getStateMetaInfo().getName(), str, backendWritableBroadcastState.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            return backendWritableBroadcastState;
        }
        mapStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        TypeSerializer<?> typeSerializer = (TypeSerializer) Preconditions.checkNotNull(mapStateDescriptor.getKeySerializer());
        TypeSerializer<?> typeSerializer2 = (TypeSerializer) Preconditions.checkNotNull(mapStateDescriptor.getValueSerializer());
        BackendWritableBroadcastState<?, ?> backendWritableBroadcastState2 = this.registeredBroadcastStates.get(str);
        if (backendWritableBroadcastState2 == null) {
            backendWritableBroadcastState2 = new HeapBroadcastState((RegisteredBroadcastStateBackendMetaInfo<?, ?>) new RegisteredBroadcastStateBackendMetaInfo(str, OperatorStateHandle.Mode.BROADCAST, typeSerializer, typeSerializer2));
            this.registeredBroadcastStates.put(str, backendWritableBroadcastState2);
        } else {
            checkStateNameAndMode(backendWritableBroadcastState2.getStateMetaInfo().getName(), str, backendWritableBroadcastState2.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            RegisteredBroadcastStateBackendMetaInfo<?, ?> stateMetaInfo = backendWritableBroadcastState2.getStateMetaInfo();
            if (stateMetaInfo.updateKeySerializer(typeSerializer).isIncompatible()) {
                throw new StateMigrationException("The new key typeSerializer for broadcast state must not be incompatible.");
            }
            if (stateMetaInfo.updateValueSerializer(typeSerializer2).isIncompatible()) {
                throw new StateMigrationException("The new value typeSerializer for broadcast state must not be incompatible.");
            }
            backendWritableBroadcastState2.setStateMetaInfo(stateMetaInfo);
        }
        this.accessedBroadcastStatesByName.put(str, backendWritableBroadcastState2);
        return backendWritableBroadcastState2;
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <K, V> BroadcastState<K, V> getBroadcastState(org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> mapStateDescriptor) throws Exception {
        return getBroadcastState(StateDescriptorUtils.transformFromV2ToV1((org.apache.flink.api.common.state.v2.MapStateDescriptor) mapStateDescriptor));
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <S> org.apache.flink.api.common.state.v2.ListState<S> getListState(org.apache.flink.api.common.state.v2.ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return new OperatorListStateAdaptor(getListState(StateDescriptorUtils.transformFromV2ToV1((org.apache.flink.api.common.state.v2.ListStateDescriptor) listStateDescriptor)));
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <S> org.apache.flink.api.common.state.v2.ListState<S> getUnionListState(org.apache.flink.api.common.state.v2.ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return new OperatorListStateAdaptor(getListState(StateDescriptorUtils.transformFromV2ToV1((org.apache.flink.api.common.state.v2.ListStateDescriptor) listStateDescriptor)));
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        return this.snapshotStrategyRunner.snapshot(j, j2, checkpointStreamFactory, checkpointOptions);
    }

    private <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException {
        Preconditions.checkNotNull(listStateDescriptor);
        String str = (String) Preconditions.checkNotNull(listStateDescriptor.getName());
        PartitionableListState<?> partitionableListState = this.accessedStatesByName.get(str);
        if (partitionableListState != null) {
            checkStateNameAndMode(partitionableListState.getStateMetaInfo().getName(), str, partitionableListState.getStateMetaInfo().getAssignmentMode(), mode);
            return partitionableListState;
        }
        listStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        TypeSerializer typeSerializer = (TypeSerializer) Preconditions.checkNotNull(listStateDescriptor.getElementSerializer());
        PartitionableListState<?> partitionableListState2 = this.registeredOperatorStates.get(str);
        if (null == partitionableListState2) {
            partitionableListState2 = new PartitionableListState<>((RegisteredOperatorStateBackendMetaInfo<?>) new RegisteredOperatorStateBackendMetaInfo(str, typeSerializer, mode));
            this.registeredOperatorStates.put(str, partitionableListState2);
        } else {
            checkStateNameAndMode(partitionableListState2.getStateMetaInfo().getName(), str, partitionableListState2.getStateMetaInfo().getAssignmentMode(), mode);
            RegisteredOperatorStateBackendMetaInfo<?> stateMetaInfo = partitionableListState2.getStateMetaInfo();
            if (stateMetaInfo.updatePartitionStateSerializer(typeSerializer.duplicate2()).isIncompatible()) {
                throw new StateMigrationException("The new state typeSerializer for operator state must not be incompatible.");
            }
            partitionableListState2.setStateMetaInfo(stateMetaInfo);
        }
        this.accessedStatesByName.put(str, partitionableListState2);
        return partitionableListState2;
    }

    private static void checkStateNameAndMode(String str, String str2, OperatorStateHandle.Mode mode, OperatorStateHandle.Mode mode2) {
        Preconditions.checkState(str.equals(str2), "Incompatible state names. Was [" + str + "], registered with [" + str2 + "].");
        Preconditions.checkState(mode.equals(mode2), "Incompatible state assignment modes. Was [" + mode + "], registered with [" + mode2 + "].");
    }
}
