package org.apache.flink.table.runtime.operators.multipleinput;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper;
import org.apache.flink.table.runtime.operators.multipleinput.input.FirstInputOfTwoInput;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
import org.apache.flink.table.runtime.operators.multipleinput.input.OneInput;
import org.apache.flink.table.runtime.operators.multipleinput.input.SecondInputOfTwoInput;
import org.apache.flink.table.runtime.operators.multipleinput.output.BroadcastingOutput;
import org.apache.flink.table.runtime.operators.multipleinput.output.CopyingBroadcastingOutput;
import org.apache.flink.table.runtime.operators.multipleinput.output.CopyingFirstInputOfTwoInputStreamOperatorOutput;
import org.apache.flink.table.runtime.operators.multipleinput.output.CopyingOneInputStreamOperatorOutput;
import org.apache.flink.table.runtime.operators.multipleinput.output.CopyingSecondInputOfTwoInputStreamOperatorOutput;
import org.apache.flink.table.runtime.operators.multipleinput.output.FirstInputOfTwoInputStreamOperatorOutput;
import org.apache.flink.table.runtime.operators.multipleinput.output.OneInputStreamOperatorOutput;
import org.apache.flink.table.runtime.operators.multipleinput.output.SecondInputOfTwoInputStreamOperatorOutput;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.class */
public abstract class MultipleInputStreamOperatorBase extends AbstractStreamOperatorV2<RowData> implements MultipleInputStreamOperator<RowData> {
    private final List<InputSpec> inputSpecs;
    protected final Map<Integer, InputSpec> inputSpecMap;
    private final List<TableOperatorWrapper<?>> headWrappers;
    private final TableOperatorWrapper<?> tailWrapper;
    protected final Deque<TableOperatorWrapper<?>> topologicalOrderingOperators;

    public MultipleInputStreamOperatorBase(StreamOperatorParameters<RowData> streamOperatorParameters, List<InputSpec> list, List<TableOperatorWrapper<?>> list2, TableOperatorWrapper<?> tableOperatorWrapper) {
        super(streamOperatorParameters, list.size());
        this.inputSpecs = list;
        this.headWrappers = list2;
        this.tailWrapper = tableOperatorWrapper;
        this.topologicalOrderingOperators = getAllOperatorsAsTopologicalOrdering();
        createAllOperators(streamOperatorParameters);
        this.inputSpecMap = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getMultipleInputId();
        }, inputSpec -> {
            return inputSpec;
        }));
        Preconditions.checkArgument(this.inputSpecMap.size() == list.size());
    }

    public List<Input> getInputs() {
        return (List) this.inputSpecs.stream().map(this::createInput).collect(Collectors.toList());
    }

    private Input createInput(InputSpec inputSpec) {
        Object streamOperator = inputSpec.getOutput().getStreamOperator();
        if (streamOperator instanceof OneInputStreamOperator) {
            return new OneInput((OneInputStreamOperator) streamOperator);
        }
        if (!(streamOperator instanceof TwoInputStreamOperator)) {
            throw new RuntimeException("Unsupported StreamOperator: " + streamOperator);
        }
        TwoInputStreamOperator twoInputStreamOperator = (TwoInputStreamOperator) streamOperator;
        return inputSpec.getOutputOpInputId() == 1 ? new FirstInputOfTwoInput(twoInputStreamOperator) : new SecondInputOfTwoInput(twoInputStreamOperator);
    }

    /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.flink.streaming.api.operators.StreamOperator] */
    public void open() throws Exception {
        super.open();
        Iterator<TableOperatorWrapper<?>> descendingIterator = this.topologicalOrderingOperators.descendingIterator();
        while (descendingIterator.hasNext()) {
            descendingIterator.next().getStreamOperator().open();
        }
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.flink.streaming.api.operators.StreamOperator] */
    public void finish() throws Exception {
        super.finish();
        Iterator<TableOperatorWrapper<?>> it = this.topologicalOrderingOperators.iterator();
        while (it.hasNext()) {
            it.next().getStreamOperator().finish();
        }
    }

    public void close() throws Exception {
        super.close();
        Iterator<TableOperatorWrapper<?>> it = this.topologicalOrderingOperators.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private Deque<TableOperatorWrapper<?>> getAllOperatorsAsTopologicalOrdering() {
        ArrayDeque arrayDeque = new ArrayDeque();
        LinkedList linkedList = new LinkedList();
        Map<TableOperatorWrapper<?>, Integer> buildOperatorToInputCountMap = buildOperatorToInputCountMap();
        for (TableOperatorWrapper<?> tableOperatorWrapper : this.headWrappers) {
            if (buildOperatorToInputCountMap.get(tableOperatorWrapper).intValue() == 0) {
                linkedList.add(tableOperatorWrapper);
            }
        }
        Preconditions.checkArgument(!linkedList.isEmpty(), "This should not happen.");
        while (!linkedList.isEmpty()) {
            TableOperatorWrapper tableOperatorWrapper2 = (TableOperatorWrapper) linkedList.poll();
            arrayDeque.add(tableOperatorWrapper2);
            for (TableOperatorWrapper<?> tableOperatorWrapper3 : tableOperatorWrapper2.getOutputWrappers()) {
                int intValue = buildOperatorToInputCountMap.get(tableOperatorWrapper3).intValue() - 1;
                buildOperatorToInputCountMap.put(tableOperatorWrapper3, Integer.valueOf(intValue));
                if (intValue == 0) {
                    linkedList.add(tableOperatorWrapper3);
                }
            }
        }
        return arrayDeque;
    }

    private Map<TableOperatorWrapper<?>, Integer> buildOperatorToInputCountMap() {
        IdentityHashMap identityHashMap = new IdentityHashMap();
        LinkedList linkedList = new LinkedList();
        linkedList.add(this.tailWrapper);
        while (!linkedList.isEmpty()) {
            TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) linkedList.poll();
            List<TableOperatorWrapper<?>> inputWrappers = tableOperatorWrapper.getInputWrappers();
            identityHashMap.put(tableOperatorWrapper, Integer.valueOf(inputWrappers.size()));
            linkedList.addAll(inputWrappers);
        }
        return identityHashMap;
    }

    private void createAllOperators(StreamOperatorParameters<RowData> streamOperatorParameters) {
        Output<StreamRecord<RowData>> copyingBroadcastingOutput;
        boolean isObjectReuseEnabled = streamOperatorParameters.getContainingTask().getExecutionConfig().isObjectReuseEnabled();
        ExecutionConfig executionConfig = streamOperatorParameters.getContainingTask().getExecutionConfig();
        Iterator<TableOperatorWrapper<?>> descendingIterator = this.topologicalOrderingOperators.descendingIterator();
        while (descendingIterator.hasNext()) {
            TableOperatorWrapper<?> next = descendingIterator.next();
            if (next == this.tailWrapper) {
                copyingBroadcastingOutput = this.output;
            } else {
                int size = next.getOutputEdges().size();
                Output<StreamRecord<RowData>>[] outputArr = new Output[size];
                for (int i = 0; i < size; i++) {
                    TableOperatorWrapper.Edge edge = next.getOutputEdges().get(i);
                    int inputId = edge.getInputId();
                    StreamOperator<RowData> streamOperator = edge.getTarget().getStreamOperator();
                    if (isObjectReuseEnabled) {
                        outputArr[i] = createOutput(streamOperator, inputId);
                    } else {
                        outputArr[i] = createCopyingOutput(edge.getSource().getOutputType().createSerializer(executionConfig.getSerializerConfig()), streamOperator, inputId);
                    }
                }
                copyingBroadcastingOutput = outputArr.length == 1 ? outputArr[0] : isObjectReuseEnabled ? new CopyingBroadcastingOutput(outputArr) : new BroadcastingOutput(outputArr);
            }
            next.createOperator(createSubOperatorParameters(streamOperatorParameters, copyingBroadcastingOutput, next));
        }
    }

    private StreamOperatorParameters<RowData> createSubOperatorParameters(StreamOperatorParameters<RowData> streamOperatorParameters, Output<StreamRecord<RowData>> output, TableOperatorWrapper<?> tableOperatorWrapper) {
        StreamConfig createStreamConfig = createStreamConfig(streamOperatorParameters, tableOperatorWrapper);
        StreamTask containingTask = streamOperatorParameters.getContainingTask();
        Objects.requireNonNull(streamOperatorParameters);
        return new StreamOperatorParameters<>(containingTask, createStreamConfig, output, streamOperatorParameters::getProcessingTimeService, streamOperatorParameters.getOperatorEventDispatcher());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamConfig createStreamConfig(StreamOperatorParameters<RowData> streamOperatorParameters, TableOperatorWrapper<?> tableOperatorWrapper) {
        ExecutionConfig executionConfig = getExecutionConfig();
        StreamConfig streamConfig = new StreamConfig(streamOperatorParameters.getStreamConfig().getConfiguration().clone());
        streamConfig.setOperatorName(tableOperatorWrapper.getOperatorName());
        streamConfig.setNumberOfNetworkInputs(tableOperatorWrapper.getAllInputTypes().size());
        streamConfig.setNumberOfOutputs(tableOperatorWrapper.getOutputEdges().size());
        streamConfig.setupNetworkInputs((TypeSerializer[]) tableOperatorWrapper.getAllInputTypes().stream().map(typeInformation -> {
            return typeInformation.createSerializer(executionConfig.getSerializerConfig());
        }).toArray(i -> {
            return new TypeSerializer[i];
        }));
        streamConfig.setTypeSerializerOut(tableOperatorWrapper.getOutputType().createSerializer(executionConfig.getSerializerConfig()));
        streamConfig.serializeAllConfigs();
        return streamConfig;
    }

    private Output<StreamRecord<RowData>> createOutput(StreamOperator<RowData> streamOperator, int i) {
        if (streamOperator instanceof OneInputStreamOperator) {
            return new OneInputStreamOperatorOutput((OneInputStreamOperator) streamOperator);
        }
        if (!(streamOperator instanceof TwoInputStreamOperator)) {
            throw new RuntimeException("Unsupported StreamOperator: " + streamOperator);
        }
        TwoInputStreamOperator twoInputStreamOperator = (TwoInputStreamOperator) streamOperator;
        return i == 1 ? new FirstInputOfTwoInputStreamOperatorOutput(twoInputStreamOperator) : new SecondInputOfTwoInputStreamOperatorOutput(twoInputStreamOperator);
    }

    private Output<StreamRecord<RowData>> createCopyingOutput(TypeSerializer<RowData> typeSerializer, StreamOperator<RowData> streamOperator, int i) {
        if (streamOperator instanceof OneInputStreamOperator) {
            return new CopyingOneInputStreamOperatorOutput((OneInputStreamOperator) streamOperator, typeSerializer);
        }
        if (!(streamOperator instanceof TwoInputStreamOperator)) {
            throw new RuntimeException("Unsupported StreamOperator: " + streamOperator);
        }
        TwoInputStreamOperator twoInputStreamOperator = (TwoInputStreamOperator) streamOperator;
        return i == 1 ? new CopyingFirstInputOfTwoInputStreamOperatorOutput(twoInputStreamOperator, typeSerializer) : new CopyingSecondInputOfTwoInputStreamOperatorOutput(twoInputStreamOperator, typeSerializer);
    }
}
