package org.apache.flink.state.api;

import java.util.OptionalInt;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.output.BootstrapStreamTaskRunner;
import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
import org.apache.flink.state.api.output.operators.GroupReduceOperator;
import org.apache.flink.state.api.runtime.MutableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/state/api/StateBootstrapTransformation.class */
public class StateBootstrapTransformation<T> {
    private final DataStream<T> stream;
    private final SavepointWriterOperatorFactory factory;

    @Nullable
    private final KeySelector<T, ?> keySelector;

    @Nullable
    private final TypeInformation<?> keyType;
    private final OptionalInt operatorMaxParallelism;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StateBootstrapTransformation(DataStream<T> dataStream, OptionalInt optionalInt, SavepointWriterOperatorFactory savepointWriterOperatorFactory) {
        this.stream = dataStream;
        this.operatorMaxParallelism = optionalInt;
        this.factory = savepointWriterOperatorFactory;
        this.keySelector = null;
        this.keyType = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public <K> StateBootstrapTransformation(DataStream<T> dataStream, OptionalInt optionalInt, SavepointWriterOperatorFactory savepointWriterOperatorFactory, @Nonnull KeySelector<T, K> keySelector, @Nonnull TypeInformation<K> typeInformation) {
        this.stream = dataStream;
        this.operatorMaxParallelism = optionalInt;
        this.factory = savepointWriterOperatorFactory;
        this.keySelector = keySelector;
        this.keyType = typeInformation;
    }

    int getMaxParallelism(int i) {
        return this.operatorMaxParallelism.orElse(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStream<OperatorState> writeOperatorState(OperatorID operatorID, StateBackend stateBackend, Configuration configuration, int i, Path path) {
        int maxParallelism = getMaxParallelism(i);
        return writeOperatorSubtaskStates(operatorID, stateBackend, configuration, path, maxParallelism).transform("reduce(OperatorSubtaskState)", TypeInformation.of(OperatorState.class), new GroupReduceOperator(new OperatorSubtaskStateReducer(operatorID, maxParallelism))).forceNonParallel();
    }

    @VisibleForTesting
    SingleOutputStreamOperator<TaggedOperatorSubtaskState> writeOperatorSubtaskStates(OperatorID operatorID, StateBackend stateBackend, Path path, int i) {
        return writeOperatorSubtaskStates(operatorID, stateBackend, new Configuration(), path, i);
    }

    private SingleOutputStreamOperator<TaggedOperatorSubtaskState> writeOperatorSubtaskStates(OperatorID operatorID, StateBackend stateBackend, Configuration configuration, Path path, int i) {
        StreamOperator<TaggedOperatorSubtaskState> streamOperator = (StreamOperator) this.stream.getExecutionEnvironment().clean(this.factory.createOperator(System.currentTimeMillis(), path));
        BootstrapStreamTaskRunner bootstrapStreamTaskRunner = new BootstrapStreamTaskRunner(getConfig(operatorID, stateBackend, configuration, streamOperator), i);
        KeyedStream keyedStream = this.stream;
        if (this.keySelector != null) {
            keyedStream = this.stream.keyBy(this.keySelector);
        }
        SingleOutputStreamOperator<TaggedOperatorSubtaskState> maxParallelism = keyedStream.transform(operatorID.toHexString(), TypeInformation.of(TaggedOperatorSubtaskState.class), bootstrapStreamTaskRunner).setMaxParallelism(i);
        if (streamOperator instanceof BroadcastStateBootstrapOperator) {
            maxParallelism = maxParallelism.setParallelism(1);
        } else if (getParallelism(maxParallelism) > i) {
            maxParallelism.setParallelism(i);
        }
        return maxParallelism;
    }

    @VisibleForTesting
    StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, Configuration configuration, StreamOperator<TaggedOperatorSubtaskState> streamOperator) {
        Configuration configuration2 = new Configuration(MutableConfig.of(this.stream.getExecutionEnvironment().getConfiguration()));
        configuration2.addAll(configuration);
        StreamConfig streamConfig = new StreamConfig(configuration2);
        streamConfig.setChainStart();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
        if (this.keyType != null) {
            streamConfig.setStateKeySerializer(this.keyType.createSerializer(this.stream.getExecutionEnvironment().getConfig().getSerializerConfig()));
            streamConfig.setStatePartitioner(0, this.keySelector);
        }
        streamConfig.setStreamOperator(streamOperator);
        streamConfig.setOperatorName(operatorID.toHexString());
        streamConfig.setOperatorID(operatorID);
        streamConfig.setStateBackend(stateBackend);
        streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0d);
        streamConfig.serializeAllConfigs();
        return streamConfig;
    }

    private static int getParallelism(SingleOutputStreamOperator<TaggedOperatorSubtaskState> singleOutputStreamOperator) {
        int parallelism = singleOutputStreamOperator.getParallelism();
        if (parallelism == -1) {
            parallelism = singleOutputStreamOperator.getExecutionEnvironment().getParallelism();
        }
        return parallelism;
    }
}
