package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManagerBuilder;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.class */
public class StreamTaskMailboxTestHarnessBuilder<OUT> {
    protected final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
    protected final TypeSerializer<OUT> outputSerializer;

    @Nullable
    protected StreamTestSingleInputGate[] inputGates;
    protected Map<Long, TaskStateSnapshot> taskStateSnapshots;
    protected boolean collectNetworkEvents;
    protected final ExecutionConfig executionConfig = new ExecutionConfig();
    protected long memorySize = 1048576;
    protected int bufferSize = StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE;
    protected Configuration jobConfig = new Configuration();
    protected StreamConfig streamConfig = new StreamConfig(new Configuration());
    protected LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();
    protected TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
    protected CheckpointResponder checkpointResponder = new TestCheckpointResponder();
    protected final ArrayList<StreamConfig.InputConfig> inputs = new ArrayList<>();
    protected final ArrayList<Integer> inputChannelsPerGate = new ArrayList<>();
    protected final ArrayList<ResultPartitionWriter> additionalOutputs = new ArrayList<>();
    private boolean setupCalled = false;
    private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
    private Function<SingleInputGateBuilder, SingleInputGateBuilder> modifyGateBuilder = Function.identity();
    private StreamPartitioner<?> partitioner = new BroadcastPartitioner();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder$SourceInputConfigPlaceHolder.class */
    public static class SourceInputConfigPlaceHolder<SourceOut> implements StreamConfig.InputConfig {
        private OperatorID operatorId;
        private SourceOperatorFactory<SourceOut> sourceOperatorFactory;
        private TypeSerializer<SourceOut> sourceSerializer;

        public SourceInputConfigPlaceHolder(OperatorID operatorID, SourceOperatorFactory<SourceOut> sourceOperatorFactory, TypeSerializer<SourceOut> typeSerializer) {
            this.operatorId = operatorID;
            this.sourceOperatorFactory = sourceOperatorFactory;
            this.sourceSerializer = typeSerializer;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public SourceOperatorFactory<SourceOut> getSourceOperatorFactory() {
            return this.sourceOperatorFactory;
        }

        public TypeSerializer<SourceOut> getSourceSerializer() {
            return this.sourceSerializer;
        }
    }

    public StreamTaskMailboxTestHarnessBuilder(FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> functionWithException, TypeInformation<OUT> typeInformation) {
        this.taskFactory = (FunctionWithException) Preconditions.checkNotNull(functionWithException);
        this.outputSerializer = typeInformation.createSerializer(this.executionConfig.getSerializerConfig());
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyExecutionConfig(Consumer<ExecutionConfig> consumer) {
        consumer.accept(this.executionConfig);
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyStreamConfig(Consumer<StreamConfig> consumer) {
        consumer.accept(this.streamConfig);
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setCheckpointResponder(CheckpointResponder checkpointResponder) {
        this.checkpointResponder = checkpointResponder;
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
        this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setCollectNetworkEvents() {
        this.collectNetworkEvents = true;
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyGateBuilder(Function<SingleInputGateBuilder, SingleInputGateBuilder> function) {
        this.modifyGateBuilder = function;
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> typeInformation) {
        return addInput(typeInformation, 1);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> typeInformation, int i) {
        return addInput(typeInformation, i, null);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> typeInformation, int i, @Nullable KeySelector<?, ?> keySelector) {
        this.streamConfig.setStatePartitioner(this.inputs.size(), keySelector);
        this.inputs.add(new StreamConfig.NetworkInputConfig(typeInformation.createSerializer(this.executionConfig.getSerializerConfig()), this.inputChannelsPerGate.size()));
        this.inputChannelsPerGate.add(Integer.valueOf(i));
        return this;
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeInformation<SourceType> typeInformation) {
        return addSourceInput(new OperatorID(), sourceOperatorFactory, typeInformation);
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(OperatorID operatorID, SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeInformation<SourceType> typeInformation) {
        return addSourceInput(operatorID, sourceOperatorFactory, typeInformation.createSerializer(this.executionConfig.getSerializerConfig()));
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeSerializer<SourceType> typeSerializer) {
        return addSourceInput(new OperatorID(), sourceOperatorFactory, typeSerializer);
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(OperatorID operatorID, SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeSerializer<SourceType> typeSerializer) {
        this.inputs.add(new SourceInputConfigPlaceHolder(operatorID, sourceOperatorFactory, typeSerializer));
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addAdditionalOutput(ResultPartitionWriter... resultPartitionWriterArr) {
        Preconditions.checkState(!this.setupCalled, "Additional outputs must be added before harness was setup.");
        this.additionalOutputs.addAll(Arrays.asList(resultPartitionWriterArr));
        return this;
    }

    public StreamTaskMailboxTestHarness<OUT> buildUnrestored() throws Exception {
        TestTaskStateManagerBuilder checkpointResponder = TestTaskStateManager.builder().setLocalRecoveryConfig(this.localRecoveryConfig).setCheckpointResponder(this.checkpointResponder);
        if (this.taskStateSnapshots != null) {
            checkpointResponder.setReportedCheckpointId(this.taskStateSnapshots.keySet().iterator().next().longValue()).setJobManagerTaskStateSnapshotsByCheckpointId(this.taskStateSnapshots);
        }
        TestTaskStateManager build = checkpointResponder.build();
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), this.jobConfig, this.streamConfig.getConfiguration(), this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, build, this.collectNetworkEvents);
        streamMockEnvironment.setCheckpointResponder(build.getCheckpointResponder());
        streamMockEnvironment.setTaskManagerInfo(this.taskManagerRuntimeInfo);
        initializeInputs(streamMockEnvironment);
        Preconditions.checkState(this.inputGates != null, "InputGates hasn't been initialised");
        StreamElementSerializer streamElementSerializer = new StreamElementSerializer(this.outputSerializer);
        ArrayDeque arrayDeque = new ArrayDeque();
        streamMockEnvironment.addOutput(arrayDeque, streamElementSerializer);
        streamMockEnvironment.setTaskMetricGroup(this.taskMetricGroup);
        streamMockEnvironment.setCheckpointStorageAccess(new JobManagerCheckpointStorage().createCheckpointStorage(streamMockEnvironment.getJobID()));
        Iterator<ResultPartitionWriter> it = this.additionalOutputs.iterator();
        while (it.hasNext()) {
            streamMockEnvironment.addOutput(it.next());
        }
        return new StreamTaskMailboxTestHarness<>((StreamTask) this.taskFactory.apply(streamMockEnvironment), arrayDeque, this.inputGates, streamMockEnvironment);
    }

    public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
        StreamTaskMailboxTestHarness<OUT> buildUnrestored = buildUnrestored();
        buildUnrestored.streamTask.restore();
        return buildUnrestored;
    }

    protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
        this.inputGates = new StreamTestSingleInputGate[this.inputChannelsPerGate.size()];
        LinkedList linkedList = new LinkedList();
        StreamNode streamNode = new StreamNode(0, (String) null, (String) null, (StreamOperator) null, (String) null, (Class) null);
        for (int i = 0; i < this.inputs.size(); i++) {
            if (this.inputs.get(i) instanceof StreamConfig.NetworkInputConfig) {
                initializeNetworkInput(this.inputs.get(i), streamNode, streamMockEnvironment, linkedList);
            } else {
                if (!(this.inputs.get(i) instanceof SourceInputConfigPlaceHolder)) {
                    throw new UnsupportedOperationException("Unknown input type " + this.inputs.get(i));
                }
                this.inputs.set(i, initializeSourceInput(i, (SourceInputConfigPlaceHolder) this.inputs.get(i), streamNode));
            }
        }
        this.streamConfig.setInPhysicalEdges(linkedList);
        this.streamConfig.setNumberOfNetworkInputs(this.inputGates.length);
        this.streamConfig.setInputs((StreamConfig.InputConfig[]) this.inputs.toArray(new StreamConfig.InputConfig[this.inputs.size()]));
        this.streamConfig.serializeAllConfigs();
    }

    private void initializeNetworkInput(StreamConfig.NetworkInputConfig networkInputConfig, StreamNode streamNode, StreamMockEnvironment streamMockEnvironment, List<StreamEdge> list) {
        int inputGateIndex = networkInputConfig.getInputGateIndex();
        this.inputGates[inputGateIndex] = new StreamTestSingleInputGate(this.inputChannelsPerGate.get(inputGateIndex).intValue(), inputGateIndex, networkInputConfig.getTypeSerializer(), this.bufferSize, this.modifyGateBuilder.apply(new SingleInputGateBuilder().setBufferDebloatConfiguration(BufferDebloatConfiguration.fromConfiguration(streamMockEnvironment.getTaskManagerInfo().getConfiguration()))));
        list.add(new StreamEdge(new StreamNode(0, (String) null, (String) null, (StreamOperator) null, (String) null, SourceStreamTask.class), streamNode, inputGateIndex + 1, this.partitioner, (OutputTag) null));
        streamMockEnvironment.addInputGate(this.inputGates[inputGateIndex].getInputGate());
    }

    private StreamConfig.SourceInputConfig initializeSourceInput(int i, SourceInputConfigPlaceHolder sourceInputConfigPlaceHolder, StreamNode streamNode) {
        Map transitiveChainedTaskConfigs = this.streamConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
        Integer num = transitiveChainedTaskConfigs.isEmpty() ? 0 : (Integer) Collections.max(transitiveChainedTaskConfigs.keySet());
        LinkedList linkedList = new LinkedList();
        StreamEdge streamEdge = new StreamEdge(new StreamNode(Integer.valueOf(num.intValue() + i + 1337), (String) null, (String) null, (StreamOperator) null, (String) null, (Class) null), streamNode, 0, new ForwardPartitioner(), (OutputTag) null);
        linkedList.add(streamEdge);
        LinkedList linkedList2 = new LinkedList();
        linkedList2.add(new NonChainedOutput(true, streamEdge.getSourceId(), 1, 1, 100L, false, new IntermediateDataSetID(), (OutputTag) null, new ForwardPartitioner(), ResultPartitionType.PIPELINED_BOUNDED));
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setVertexNonChainedOutputs(linkedList2);
        streamConfig.setChainedOutputs(linkedList);
        streamConfig.setTypeSerializerOut(sourceInputConfigPlaceHolder.getSourceSerializer());
        streamConfig.setOperatorID(sourceInputConfigPlaceHolder.getOperatorId());
        streamConfig.setStreamOperatorFactory(sourceInputConfigPlaceHolder.getSourceOperatorFactory());
        streamConfig.serializeAllConfigs();
        transitiveChainedTaskConfigs.put(Integer.valueOf(streamEdge.getSourceId()), streamConfig);
        this.streamConfig.setAndSerializeTransitiveChainedTaskConfigs(transitiveChainedTaskConfigs);
        return new StreamConfig.SourceInputConfig(streamEdge);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> streamOperator) {
        return setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) SimpleOperatorFactory.of(streamOperator), new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> streamOperator, OperatorID operatorID) {
        return setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) SimpleOperatorFactory.of(streamOperator), operatorID);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> streamOperatorFactory) {
        return setupOutputForSingletonOperatorChain(streamOperatorFactory, new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> streamOperatorFactory, OperatorID operatorID) {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        return (StreamTaskMailboxTestHarnessBuilder) setupOperatorChain(operatorID, streamOperatorFactory).finishForSingletonOperatorChain(this.outputSerializer, this.partitioner);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperator<?> streamOperator) {
        return setupOperatorChain(new OperatorID(), streamOperator);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID operatorID, StreamOperator<?> streamOperator) {
        return setupOperatorChain(operatorID, (StreamOperatorFactory<?>) SimpleOperatorFactory.of(streamOperator));
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperatorFactory<?> streamOperatorFactory) {
        return setupOperatorChain(new OperatorID(), streamOperatorFactory);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID operatorID, StreamOperatorFactory<?> streamOperatorFactory) {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setStreamOperatorFactory(streamOperatorFactory);
        this.streamConfig.serializeAllConfigs();
        return new StreamConfigChainer<>(operatorID, this.streamConfig, this, 1 + this.additionalOutputs.size());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskMetricGroup(TaskMetricGroup taskMetricGroup) {
        this.taskMetricGroup = taskMetricGroup;
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setKeyType(TypeInformation<?> typeInformation) {
        this.streamConfig.setStateKeySerializer(typeInformation.createSerializer(this.executionConfig.getSerializerConfig()));
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskStateSnapshot(long j, TaskStateSnapshot taskStateSnapshot) {
        this.taskStateSnapshots = Collections.singletonMap(Long.valueOf(j), taskStateSnapshot);
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setOutputPartitioner(StreamPartitioner streamPartitioner) {
        this.partitioner = streamPartitioner;
        return this;
    }
}
