/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManagerBuilder;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamConfigChainer;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

public class StreamTaskMailboxTestHarnessBuilder<OUT> {
    protected final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
    protected final TypeSerializer<OUT> outputSerializer;
    protected final ExecutionConfig executionConfig = new ExecutionConfig();
    protected long memorySize = 0x100000L;
    protected int bufferSize = 1024;
    protected Configuration jobConfig = new Configuration();
    protected StreamConfig streamConfig = new StreamConfig(new Configuration());
    protected LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();
    @Nullable
    protected StreamTestSingleInputGate[] inputGates;
    protected TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
    protected Map<Long, TaskStateSnapshot> taskStateSnapshots;
    protected CheckpointResponder checkpointResponder = new TestCheckpointResponder();
    protected boolean collectNetworkEvents;
    protected final ArrayList<StreamConfig.InputConfig> inputs = new ArrayList();
    protected final ArrayList<Integer> inputChannelsPerGate = new ArrayList();
    protected final ArrayList<ResultPartitionWriter> additionalOutputs = new ArrayList();
    private boolean setupCalled = false;
    private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
    private Function<SingleInputGateBuilder, SingleInputGateBuilder> modifyGateBuilder = Function.identity();
    private StreamPartitioner<?> partitioner = new BroadcastPartitioner();

    public StreamTaskMailboxTestHarnessBuilder(FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, TypeInformation<OUT> outputType) {
        this.taskFactory = (FunctionWithException)Preconditions.checkNotNull(taskFactory);
        this.outputSerializer = outputType.createSerializer(this.executionConfig.getSerializerConfig());
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyExecutionConfig(Consumer<ExecutionConfig> action) {
        action.accept(this.executionConfig);
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyStreamConfig(Consumer<StreamConfig> action) {
        action.accept(this.streamConfig);
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> addJobConfig(ConfigOption<T> option, T value) {
        this.jobConfig.set(option, value);
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setCheckpointResponder(CheckpointResponder checkpointResponder) {
        this.checkpointResponder = checkpointResponder;
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
        this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setCollectNetworkEvents() {
        this.collectNetworkEvents = true;
        return this;
    }

    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyGateBuilder(Function<SingleInputGateBuilder, SingleInputGateBuilder> singleInputGateBuilder) {
        this.modifyGateBuilder = singleInputGateBuilder;
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
        return this.addInput(inputType, 1);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
        return this.addInput(inputType, inputChannels, null);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels, @Nullable KeySelector<?, ?> keySelector) {
        this.streamConfig.setStatePartitioner(this.inputs.size(), keySelector);
        this.inputs.add((StreamConfig.InputConfig)new StreamConfig.NetworkInputConfig(inputType.createSerializer(this.executionConfig.getSerializerConfig()), this.inputChannelsPerGate.size()));
        this.inputChannelsPerGate.add(inputChannels);
        return this;
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeInformation<SourceType> sourceType) {
        return this.addSourceInput(new OperatorID(), sourceOperatorFactory, sourceType);
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(OperatorID operatorId, SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeInformation<SourceType> sourceType) {
        return this.addSourceInput(operatorId, sourceOperatorFactory, sourceType.createSerializer(this.executionConfig.getSerializerConfig()));
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeSerializer<SourceType> sourceSerializer) {
        return this.addSourceInput(new OperatorID(), sourceOperatorFactory, sourceSerializer);
    }

    public <SourceType> StreamTaskMailboxTestHarnessBuilder<OUT> addSourceInput(OperatorID operatorId, SourceOperatorFactory<SourceType> sourceOperatorFactory, TypeSerializer<SourceType> sourceSerializer) {
        this.inputs.add(new SourceInputConfigPlaceHolder<SourceType>(operatorId, sourceOperatorFactory, sourceSerializer));
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> addAdditionalOutput(ResultPartitionWriter ... writers) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"Additional outputs must be added before harness was setup.");
        this.additionalOutputs.addAll(Arrays.asList(writers));
        return this;
    }

    public StreamTaskMailboxTestHarness<OUT> buildUnrestored() throws Exception {
        TestTaskStateManagerBuilder taskStateManagerBuilder = TestTaskStateManager.builder().setLocalRecoveryConfig(this.localRecoveryConfig).setCheckpointResponder(this.checkpointResponder);
        if (this.taskStateSnapshots != null) {
            taskStateManagerBuilder.setReportedCheckpointId(this.taskStateSnapshots.keySet().iterator().next().longValue()).setJobManagerTaskStateSnapshotsByCheckpointId(this.taskStateSnapshots);
        }
        TestTaskStateManager taskStateManager = taskStateManagerBuilder.build();
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), this.jobConfig, this.streamConfig.getConfiguration(), this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, (TaskStateManager)taskStateManager, this.collectNetworkEvents);
        streamMockEnvironment.setCheckpointResponder(taskStateManager.getCheckpointResponder());
        streamMockEnvironment.setTaskManagerInfo(this.taskManagerRuntimeInfo);
        this.initializeInputs(streamMockEnvironment);
        Preconditions.checkState((this.inputGates != null ? 1 : 0) != 0, (Object)"InputGates hasn't been initialised");
        StreamElementSerializer outputStreamRecordSerializer = new StreamElementSerializer(this.outputSerializer);
        ArrayDeque<Object> outputList = new ArrayDeque<Object>();
        streamMockEnvironment.addOutput(outputList, (TypeSerializer)outputStreamRecordSerializer);
        streamMockEnvironment.setTaskMetricGroup(this.taskMetricGroup);
        streamMockEnvironment.setCheckpointStorageAccess(new JobManagerCheckpointStorage().createCheckpointStorage(streamMockEnvironment.getJobID()));
        for (ResultPartitionWriter writer : this.additionalOutputs) {
            streamMockEnvironment.addOutput(writer);
        }
        StreamTask task = (StreamTask)this.taskFactory.apply((Object)streamMockEnvironment);
        return new StreamTaskMailboxTestHarness(task, outputList, this.inputGates, streamMockEnvironment);
    }

    public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
        StreamTaskMailboxTestHarness<OUT> harness = this.buildUnrestored();
        harness.streamTask.restore();
        return harness;
    }

    protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
        this.inputGates = new StreamTestSingleInputGate[this.inputChannelsPerGate.size()];
        LinkedList<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
        StreamNode mainNode = new StreamNode(Integer.valueOf(0), null, null, (StreamOperator)null, null, null);
        for (int i = 0; i < this.inputs.size(); ++i) {
            if (this.inputs.get(i) instanceof StreamConfig.NetworkInputConfig) {
                StreamConfig.NetworkInputConfig networkInput = (StreamConfig.NetworkInputConfig)this.inputs.get(i);
                this.initializeNetworkInput(networkInput, mainNode, streamMockEnvironment, inPhysicalEdges);
                continue;
            }
            if (this.inputs.get(i) instanceof SourceInputConfigPlaceHolder) {
                SourceInputConfigPlaceHolder sourceInput = (SourceInputConfigPlaceHolder)this.inputs.get(i);
                this.inputs.set(i, (StreamConfig.InputConfig)this.initializeSourceInput(i, sourceInput, mainNode));
                continue;
            }
            throw new UnsupportedOperationException("Unknown input type " + this.inputs.get(i));
        }
        this.streamConfig.setInPhysicalEdges(inPhysicalEdges);
        this.streamConfig.setNumberOfNetworkInputs(this.inputGates.length);
        this.streamConfig.setInputs(this.inputs.toArray(new StreamConfig.InputConfig[this.inputs.size()]));
        this.streamConfig.serializeAllConfigs();
    }

    private void initializeNetworkInput(StreamConfig.NetworkInputConfig networkInput, StreamNode targetVertexDummy, StreamMockEnvironment streamMockEnvironment, List<StreamEdge> inPhysicalEdges) {
        int gateIndex = networkInput.getInputGateIndex();
        TypeSerializer inputSerializer = networkInput.getTypeSerializer();
        this.inputGates[gateIndex] = new StreamTestSingleInputGate(this.inputChannelsPerGate.get(gateIndex), gateIndex, inputSerializer, this.bufferSize, this.modifyGateBuilder.apply(new SingleInputGateBuilder().setBufferDebloatConfiguration(BufferDebloatConfiguration.fromConfiguration((ReadableConfig)streamMockEnvironment.getTaskManagerInfo().getConfiguration()))));
        StreamNode sourceVertexDummy = new StreamNode(Integer.valueOf(0), null, null, (StreamOperator)null, null, SourceStreamTask.class);
        StreamEdge streamEdge = new StreamEdge(sourceVertexDummy, targetVertexDummy, gateIndex + 1, this.partitioner, null);
        inPhysicalEdges.add(streamEdge);
        streamMockEnvironment.addInputGate((IndexedInputGate)this.inputGates[gateIndex].getInputGate());
    }

    private StreamConfig.SourceInputConfig initializeSourceInput(int inputId, SourceInputConfigPlaceHolder sourceInput, StreamNode mainNode) {
        Map transitiveChainedTaskConfigs = this.streamConfig.getTransitiveChainedTaskConfigs(this.getClass().getClassLoader());
        Integer maxNodeId = transitiveChainedTaskConfigs.isEmpty() ? Integer.valueOf(0) : (Integer)Collections.max(transitiveChainedTaskConfigs.keySet());
        LinkedList<StreamEdge> chainedOutputs = new LinkedList<StreamEdge>();
        StreamEdge sourceToMainEdge = new StreamEdge(new StreamNode(Integer.valueOf(maxNodeId + inputId + 1337), null, null, (StreamOperator)null, null, null), mainNode, 0, (StreamPartitioner)new ForwardPartitioner(), null);
        chainedOutputs.add(sourceToMainEdge);
        LinkedList<NonChainedOutput> streamOutputsInOrder = new LinkedList<NonChainedOutput>();
        streamOutputsInOrder.add(new NonChainedOutput(true, sourceToMainEdge.getSourceId(), 1, 1, 100L, false, new IntermediateDataSetID(), null, (StreamPartitioner)new ForwardPartitioner(), ResultPartitionType.PIPELINED_BOUNDED));
        StreamConfig sourceConfig = new StreamConfig(new Configuration());
        sourceConfig.setVertexNonChainedOutputs(streamOutputsInOrder);
        sourceConfig.setChainedOutputs(chainedOutputs);
        sourceConfig.setTypeSerializerOut(sourceInput.getSourceSerializer());
        sourceConfig.setOperatorID(sourceInput.getOperatorId());
        sourceConfig.setStreamOperatorFactory(sourceInput.getSourceOperatorFactory());
        sourceConfig.serializeAllConfigs();
        transitiveChainedTaskConfigs.put(sourceToMainEdge.getSourceId(), sourceConfig);
        this.streamConfig.setAndSerializeTransitiveChainedTaskConfigs(transitiveChainedTaskConfigs);
        return new StreamConfig.SourceInputConfig(sourceToMainEdge);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> operator) {
        return this.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)SimpleOperatorFactory.of(operator), new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> operator, OperatorID operatorID) {
        return this.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)SimpleOperatorFactory.of(operator), operatorID);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> factory) {
        return this.setupOutputForSingletonOperatorChain(factory, new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> factory, OperatorID operatorID) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        return (StreamTaskMailboxTestHarnessBuilder)this.setupOperatorChain(operatorID, factory).finishForSingletonOperatorChain(this.outputSerializer, this.partitioner);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperator<?> headOperator) {
        return this.setupOperatorChain(new OperatorID(), headOperator);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID headOperatorId, StreamOperator<?> headOperator) {
        return this.setupOperatorChain(headOperatorId, (StreamOperatorFactory<?>)SimpleOperatorFactory.of(headOperator));
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperatorFactory<?> headOperatorFactory) {
        return this.setupOperatorChain(new OperatorID(), headOperatorFactory);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID headOperatorId, StreamOperatorFactory<?> headOperatorFactory) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setStreamOperatorFactory(headOperatorFactory);
        this.streamConfig.serializeAllConfigs();
        return new StreamConfigChainer(headOperatorId, this.streamConfig, (Object)this, 1 + this.additionalOutputs.size());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskMetricGroup(TaskMetricGroup taskMetricGroup) {
        this.taskMetricGroup = taskMetricGroup;
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setKeyType(TypeInformation<?> keyType) {
        this.streamConfig.setStateKeySerializer(keyType.createSerializer(this.executionConfig.getSerializerConfig()));
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskStateSnapshot(long checkpointId, TaskStateSnapshot snapshot) {
        this.taskStateSnapshots = Collections.singletonMap(checkpointId, snapshot);
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setOutputPartitioner(StreamPartitioner partitioner) {
        this.partitioner = partitioner;
        return this;
    }

    public static class SourceInputConfigPlaceHolder<SourceOut>
    implements StreamConfig.InputConfig {
        private OperatorID operatorId;
        private SourceOperatorFactory<SourceOut> sourceOperatorFactory;
        private TypeSerializer<SourceOut> sourceSerializer;

        public SourceInputConfigPlaceHolder(OperatorID operatorId, SourceOperatorFactory<SourceOut> sourceOperatorFactory, TypeSerializer<SourceOut> sourceSerializer) {
            this.operatorId = operatorId;
            this.sourceOperatorFactory = sourceOperatorFactory;
            this.sourceSerializer = sourceSerializer;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public SourceOperatorFactory<SourceOut> getSourceOperatorFactory() {
            return this.sourceOperatorFactory;
        }

        public TypeSerializer<SourceOut> getSourceSerializer() {
            return this.sourceSerializer;
        }
    }
}

