package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.InputFormatOperatorFactory;
import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SelectTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.SplitTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGenerator.class */
public class StreamGraphGenerator {
    public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 128;
    public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
    public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
    private final List<Transformation<?>> transformations;
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;
    private StateBackend stateBackend;
    private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts;
    private StreamGraph streamGraph;
    private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
    public static final ScheduleMode DEFAULT_SCHEDULE_MODE = ScheduleMode.EAGER;
    public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
    protected static Integer iterationIdCounter = 0;
    private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
    private boolean chaining = true;
    private ScheduleMode scheduleMode = DEFAULT_SCHEDULE_MODE;
    private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
    private long defaultBufferTimeout = -1;
    private String jobName = "Flink Streaming Job";
    private GlobalDataExchangeMode globalDataExchangeMode = GlobalDataExchangeMode.ALL_EDGES_PIPELINED;

    public static int getNewIterationNodeId() {
        Integer num = iterationIdCounter;
        iterationIdCounter = Integer.valueOf(iterationIdCounter.intValue() - 1);
        return iterationIdCounter.intValue();
    }

    public StreamGraphGenerator(List<Transformation<?>> list, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {
        this.transformations = (List) Preconditions.checkNotNull(list);
        this.executionConfig = (ExecutionConfig) Preconditions.checkNotNull(executionConfig);
        this.checkpointConfig = (CheckpointConfig) Preconditions.checkNotNull(checkpointConfig);
    }

    public StreamGraphGenerator setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
        return this;
    }

    public StreamGraphGenerator setChaining(boolean z) {
        this.chaining = z;
        return this;
    }

    public StreamGraphGenerator setScheduleMode(ScheduleMode scheduleMode) {
        this.scheduleMode = scheduleMode;
        return this;
    }

    public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> collection) {
        this.userArtifacts = collection;
        return this;
    }

    public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = timeCharacteristic;
        return this;
    }

    public StreamGraphGenerator setDefaultBufferTimeout(long j) {
        this.defaultBufferTimeout = j;
        return this;
    }

    public StreamGraphGenerator setJobName(String str) {
        this.jobName = str;
        return this;
    }

    public StreamGraphGenerator setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) {
        this.globalDataExchangeMode = globalDataExchangeMode;
        return this;
    }

    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
        this.savepointRestoreSettings = savepointRestoreSettings;
    }

    public StreamGraph generate() {
        this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);
        this.streamGraph.setStateBackend(this.stateBackend);
        this.streamGraph.setChaining(this.chaining);
        this.streamGraph.setScheduleMode(this.scheduleMode);
        this.streamGraph.setUserArtifacts(this.userArtifacts);
        this.streamGraph.setTimeCharacteristic(this.timeCharacteristic);
        this.streamGraph.setJobName(this.jobName);
        this.streamGraph.setGlobalDataExchangeMode(this.globalDataExchangeMode);
        this.alreadyTransformed = new HashMap();
        Iterator<Transformation<?>> it = this.transformations.iterator();
        while (it.hasNext()) {
            transform(it.next());
        }
        for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
            if (streamNode.getInEdges().stream().anyMatch(streamEdge -> {
                return streamEdge.getPartitioner().isBroadcast();
            })) {
                Iterator<StreamEdge> it2 = streamNode.getInEdges().iterator();
                while (it2.hasNext()) {
                    it2.next().setSupportsUnalignedCheckpoints(false);
                }
            }
        }
        StreamGraph streamGraph = this.streamGraph;
        this.alreadyTransformed.clear();
        this.alreadyTransformed = null;
        this.streamGraph = null;
        return streamGraph;
    }

    private Collection<Integer> transform(Transformation<?> transformation) {
        Collection<Integer> transformSideOutput;
        int maxParallelism;
        if (this.alreadyTransformed.containsKey(transformation)) {
            return this.alreadyTransformed.get(transformation);
        }
        LOG.debug("Transforming " + transformation);
        if (transformation.getMaxParallelism() <= 0 && (maxParallelism = this.executionConfig.getMaxParallelism()) > 0) {
            transformation.setMaxParallelism(maxParallelism);
        }
        transformation.getOutputType();
        if (transformation instanceof OneInputTransformation) {
            transformSideOutput = transformOneInputTransform((OneInputTransformation) transformation);
        } else if (transformation instanceof TwoInputTransformation) {
            transformSideOutput = transformTwoInputTransform((TwoInputTransformation) transformation);
        } else if (transformation instanceof AbstractMultipleInputTransformation) {
            transformSideOutput = transformMultipleInputTransform((AbstractMultipleInputTransformation) transformation);
        } else if (transformation instanceof SourceTransformation) {
            transformSideOutput = transformSource((SourceTransformation) transformation);
        } else if (transformation instanceof LegacySourceTransformation) {
            transformSideOutput = transformLegacySource((LegacySourceTransformation) transformation);
        } else if (transformation instanceof SinkTransformation) {
            transformSideOutput = transformSink((SinkTransformation) transformation);
        } else if (transformation instanceof UnionTransformation) {
            transformSideOutput = transformUnion((UnionTransformation) transformation);
        } else if (transformation instanceof SplitTransformation) {
            transformSideOutput = transformSplit((SplitTransformation) transformation);
        } else if (transformation instanceof SelectTransformation) {
            transformSideOutput = transformSelect((SelectTransformation) transformation);
        } else if (transformation instanceof FeedbackTransformation) {
            transformSideOutput = transformFeedback((FeedbackTransformation) transformation);
        } else if (transformation instanceof CoFeedbackTransformation) {
            transformSideOutput = transformCoFeedback((CoFeedbackTransformation) transformation);
        } else if (transformation instanceof PartitionTransformation) {
            transformSideOutput = transformPartition((PartitionTransformation) transformation);
        } else {
            if (!(transformation instanceof SideOutputTransformation)) {
                throw new IllegalStateException("Unknown transformation: " + transformation);
            }
            transformSideOutput = transformSideOutput((SideOutputTransformation) transformation);
        }
        if (!this.alreadyTransformed.containsKey(transformation)) {
            this.alreadyTransformed.put(transformation, transformSideOutput);
        }
        if (transformation.getBufferTimeout() >= 0) {
            this.streamGraph.setBufferTimeout(Integer.valueOf(transformation.getId()), transformation.getBufferTimeout());
        } else {
            this.streamGraph.setBufferTimeout(Integer.valueOf(transformation.getId()), this.defaultBufferTimeout);
        }
        if (transformation.getUid() != null) {
            this.streamGraph.setTransformationUID(Integer.valueOf(transformation.getId()), transformation.getUid());
        }
        if (transformation.getUserProvidedNodeHash() != null) {
            this.streamGraph.setTransformationUserHash(Integer.valueOf(transformation.getId()), transformation.getUserProvidedNodeHash());
        }
        if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && (transformation instanceof PhysicalTransformation) && transformation.getUserProvidedNodeHash() == null && transformation.getUid() == null) {
            throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transformation.getName());
        }
        if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
            this.streamGraph.setResources(transformation.getId(), transformation.getMinResources(), transformation.getPreferredResources());
        }
        this.streamGraph.setManagedMemoryWeight(transformation.getId(), transformation.getManagedMemoryWeight());
        return transformSideOutput;
    }

    private <T> Collection<Integer> transformUnion(UnionTransformation<T> unionTransformation) {
        List<Transformation<T>> inputs = unionTransformation.getInputs();
        ArrayList arrayList = new ArrayList();
        Iterator<Transformation<T>> it = inputs.iterator();
        while (it.hasNext()) {
            arrayList.addAll(transform(it.next()));
        }
        return arrayList;
    }

    private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partitionTransformation) {
        Transformation<?> input = partitionTransformation.getInput();
        ArrayList arrayList = new ArrayList();
        for (Integer num : transform(input)) {
            int newNodeId = Transformation.getNewNodeId();
            this.streamGraph.addVirtualPartitionNode(num, Integer.valueOf(newNodeId), partitionTransformation.getPartitioner(), partitionTransformation.getShuffleMode());
            arrayList.add(Integer.valueOf(newNodeId));
        }
        return arrayList;
    }

    private <T> Collection<Integer> transformSplit(SplitTransformation<T> splitTransformation) {
        Transformation<?> input = splitTransformation.getInput();
        Collection<Integer> transform = transform(input);
        validateSplitTransformation(input);
        if (this.alreadyTransformed.containsKey(splitTransformation)) {
            return this.alreadyTransformed.get(splitTransformation);
        }
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addOutputSelector(Integer.valueOf(it.next().intValue()), splitTransformation.getOutputSelector());
        }
        return transform;
    }

    private <T> Collection<Integer> transformSelect(SelectTransformation<T> selectTransformation) {
        Collection<Integer> transform = transform(selectTransformation.getInput());
        if (this.alreadyTransformed.containsKey(selectTransformation)) {
            return this.alreadyTransformed.get(selectTransformation);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            int newNodeId = Transformation.getNewNodeId();
            this.streamGraph.addVirtualSelectNode(Integer.valueOf(intValue), Integer.valueOf(newNodeId), selectTransformation.getSelectedNames());
            arrayList.add(Integer.valueOf(newNodeId));
        }
        return arrayList;
    }

    private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutputTransformation) {
        Collection<Integer> transform = transform(sideOutputTransformation.getInput());
        if (this.alreadyTransformed.containsKey(sideOutputTransformation)) {
            return this.alreadyTransformed.get(sideOutputTransformation);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            int newNodeId = Transformation.getNewNodeId();
            this.streamGraph.addVirtualSideOutputNode(Integer.valueOf(intValue), Integer.valueOf(newNodeId), sideOutputTransformation.getOutputTag());
            arrayList.add(Integer.valueOf(newNodeId));
        }
        return arrayList;
    }

    private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> feedbackTransformation) {
        if (feedbackTransformation.getFeedbackEdges().size() <= 0) {
            throw new IllegalStateException("Iteration " + feedbackTransformation + " does not have any feedback edges.");
        }
        Transformation<T> input = feedbackTransformation.getInput();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(transform(input));
        if (this.alreadyTransformed.containsKey(feedbackTransformation)) {
            return this.alreadyTransformed.get(feedbackTransformation);
        }
        Tuple2<StreamNode, StreamNode> createIterationSourceAndSink = this.streamGraph.createIterationSourceAndSink(feedbackTransformation.getId(), getNewIterationNodeId(), getNewIterationNodeId(), feedbackTransformation.getWaitTime().longValue(), feedbackTransformation.getParallelism(), feedbackTransformation.getMaxParallelism(), feedbackTransformation.getMinResources(), feedbackTransformation.getPreferredResources());
        StreamNode streamNode = (StreamNode) createIterationSourceAndSink.f0;
        StreamNode streamNode2 = (StreamNode) createIterationSourceAndSink.f1;
        this.streamGraph.setSerializers(Integer.valueOf(streamNode.getId()), null, null, feedbackTransformation.getOutputType().createSerializer(this.executionConfig));
        this.streamGraph.setSerializers(Integer.valueOf(streamNode2.getId()), feedbackTransformation.getOutputType().createSerializer(this.executionConfig), null, null);
        arrayList.add(Integer.valueOf(streamNode.getId()));
        this.alreadyTransformed.put(feedbackTransformation, arrayList);
        ArrayList arrayList2 = new ArrayList();
        Iterator<Transformation<T>> it = feedbackTransformation.getFeedbackEdges().iterator();
        while (it.hasNext()) {
            Collection<Integer> transform = transform(it.next());
            arrayList2.addAll(transform);
            Iterator<Integer> it2 = transform.iterator();
            while (it2.hasNext()) {
                this.streamGraph.addEdge(it2.next(), Integer.valueOf(streamNode2.getId()), 0);
            }
        }
        String determineSlotSharingGroup = determineSlotSharingGroup(null, arrayList2);
        if (determineSlotSharingGroup == null) {
            determineSlotSharingGroup = "SlotSharingGroup-" + feedbackTransformation.getId();
        }
        streamNode2.setSlotSharingGroup(determineSlotSharingGroup);
        streamNode.setSlotSharingGroup(determineSlotSharingGroup);
        return arrayList;
    }

    private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coFeedbackTransformation) {
        Tuple2<StreamNode, StreamNode> createIterationSourceAndSink = this.streamGraph.createIterationSourceAndSink(coFeedbackTransformation.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coFeedbackTransformation.getWaitTime().longValue(), coFeedbackTransformation.getParallelism(), coFeedbackTransformation.getMaxParallelism(), coFeedbackTransformation.getMinResources(), coFeedbackTransformation.getPreferredResources());
        StreamNode streamNode = (StreamNode) createIterationSourceAndSink.f0;
        StreamNode streamNode2 = (StreamNode) createIterationSourceAndSink.f1;
        this.streamGraph.setSerializers(Integer.valueOf(streamNode.getId()), null, null, coFeedbackTransformation.getOutputType().createSerializer(this.executionConfig));
        this.streamGraph.setSerializers(Integer.valueOf(streamNode2.getId()), coFeedbackTransformation.getOutputType().createSerializer(this.executionConfig), null, null);
        this.alreadyTransformed.put(coFeedbackTransformation, Collections.singleton(Integer.valueOf(streamNode.getId())));
        ArrayList arrayList = new ArrayList();
        Iterator<Transformation<F>> it = coFeedbackTransformation.getFeedbackEdges().iterator();
        while (it.hasNext()) {
            Collection<Integer> transform = transform(it.next());
            arrayList.addAll(transform);
            Iterator<Integer> it2 = transform.iterator();
            while (it2.hasNext()) {
                this.streamGraph.addEdge(it2.next(), Integer.valueOf(streamNode2.getId()), 0);
            }
        }
        String determineSlotSharingGroup = determineSlotSharingGroup(null, arrayList);
        streamNode2.setSlotSharingGroup(determineSlotSharingGroup);
        streamNode.setSlotSharingGroup(determineSlotSharingGroup);
        return Collections.singleton(Integer.valueOf(streamNode.getId()));
    }

    private <T> Collection<Integer> transformSource(SourceTransformation<T> sourceTransformation) {
        this.streamGraph.addSource(Integer.valueOf(sourceTransformation.getId()), determineSlotSharingGroup(sourceTransformation.getSlotSharingGroup(), Collections.emptyList()), sourceTransformation.getCoLocationGroupKey(), sourceTransformation.getOperatorFactory(), null, sourceTransformation.getOutputType(), "Source: " + sourceTransformation.getName());
        this.streamGraph.setParallelism(Integer.valueOf(sourceTransformation.getId()), sourceTransformation.getParallelism() != -1 ? sourceTransformation.getParallelism() : this.executionConfig.getParallelism());
        this.streamGraph.setMaxParallelism(sourceTransformation.getId(), sourceTransformation.getMaxParallelism());
        return Collections.singleton(Integer.valueOf(sourceTransformation.getId()));
    }

    private <T> Collection<Integer> transformLegacySource(LegacySourceTransformation<T> legacySourceTransformation) {
        this.streamGraph.addLegacySource(Integer.valueOf(legacySourceTransformation.getId()), determineSlotSharingGroup(legacySourceTransformation.getSlotSharingGroup(), Collections.emptyList()), legacySourceTransformation.getCoLocationGroupKey(), legacySourceTransformation.getOperatorFactory(), null, legacySourceTransformation.getOutputType(), "Source: " + legacySourceTransformation.getName());
        if (legacySourceTransformation.getOperatorFactory() instanceof InputFormatOperatorFactory) {
            this.streamGraph.setInputFormat(Integer.valueOf(legacySourceTransformation.getId()), ((InputFormatOperatorFactory) legacySourceTransformation.getOperatorFactory()).getInputFormat());
        }
        this.streamGraph.setParallelism(Integer.valueOf(legacySourceTransformation.getId()), legacySourceTransformation.getParallelism() != -1 ? legacySourceTransformation.getParallelism() : this.executionConfig.getParallelism());
        this.streamGraph.setMaxParallelism(legacySourceTransformation.getId(), legacySourceTransformation.getMaxParallelism());
        return Collections.singleton(Integer.valueOf(legacySourceTransformation.getId()));
    }

    private <T> Collection<Integer> transformSink(SinkTransformation<T> sinkTransformation) {
        Collection<Integer> transform = transform(sinkTransformation.getInput());
        this.streamGraph.addSink(Integer.valueOf(sinkTransformation.getId()), determineSlotSharingGroup(sinkTransformation.getSlotSharingGroup(), transform), sinkTransformation.getCoLocationGroupKey(), sinkTransformation.getOperatorFactory(), sinkTransformation.getInput().getOutputType(), null, "Sink: " + sinkTransformation.getName());
        StreamOperatorFactory<Object> operatorFactory = sinkTransformation.getOperatorFactory();
        if (operatorFactory instanceof OutputFormatOperatorFactory) {
            this.streamGraph.setOutputFormat(Integer.valueOf(sinkTransformation.getId()), ((OutputFormatOperatorFactory) operatorFactory).getOutputFormat());
        }
        this.streamGraph.setParallelism(Integer.valueOf(sinkTransformation.getId()), sinkTransformation.getParallelism() != -1 ? sinkTransformation.getParallelism() : this.executionConfig.getParallelism());
        this.streamGraph.setMaxParallelism(sinkTransformation.getId(), sinkTransformation.getMaxParallelism());
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addEdge(it.next(), Integer.valueOf(sinkTransformation.getId()), 0);
        }
        if (sinkTransformation.getStateKeySelector() != null) {
            this.streamGraph.setOneInputStateKey(Integer.valueOf(sinkTransformation.getId()), sinkTransformation.getStateKeySelector(), sinkTransformation.getStateKeyType().createSerializer(this.executionConfig));
        }
        return Collections.emptyList();
    }

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> oneInputTransformation) {
        Collection<Integer> transform = transform(oneInputTransformation.getInput());
        if (this.alreadyTransformed.containsKey(oneInputTransformation)) {
            return this.alreadyTransformed.get(oneInputTransformation);
        }
        this.streamGraph.addOperator(Integer.valueOf(oneInputTransformation.getId()), determineSlotSharingGroup(oneInputTransformation.getSlotSharingGroup(), transform), oneInputTransformation.getCoLocationGroupKey(), oneInputTransformation.getOperatorFactory(), oneInputTransformation.getInputType(), oneInputTransformation.getOutputType(), oneInputTransformation.getName());
        if (oneInputTransformation.getStateKeySelector() != null) {
            this.streamGraph.setOneInputStateKey(Integer.valueOf(oneInputTransformation.getId()), oneInputTransformation.getStateKeySelector(), oneInputTransformation.getStateKeyType().createSerializer(this.executionConfig));
        }
        this.streamGraph.setParallelism(Integer.valueOf(oneInputTransformation.getId()), oneInputTransformation.getParallelism() != -1 ? oneInputTransformation.getParallelism() : this.executionConfig.getParallelism());
        this.streamGraph.setMaxParallelism(oneInputTransformation.getId(), oneInputTransformation.getMaxParallelism());
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addEdge(it.next(), Integer.valueOf(oneInputTransformation.getId()), 0);
        }
        return Collections.singleton(Integer.valueOf(oneInputTransformation.getId()));
    }

    private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputTransformation<IN1, IN2, OUT> twoInputTransformation) {
        Collection<Integer> transform = transform(twoInputTransformation.getInput1());
        Collection<Integer> transform2 = transform(twoInputTransformation.getInput2());
        if (this.alreadyTransformed.containsKey(twoInputTransformation)) {
            return this.alreadyTransformed.get(twoInputTransformation);
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(transform);
        arrayList.addAll(transform2);
        this.streamGraph.addCoOperator(Integer.valueOf(twoInputTransformation.getId()), determineSlotSharingGroup(twoInputTransformation.getSlotSharingGroup(), arrayList), twoInputTransformation.getCoLocationGroupKey(), twoInputTransformation.getOperatorFactory(), twoInputTransformation.getInputType1(), twoInputTransformation.getInputType2(), twoInputTransformation.getOutputType(), twoInputTransformation.getName());
        if (twoInputTransformation.getStateKeySelector1() != null || twoInputTransformation.getStateKeySelector2() != null) {
            this.streamGraph.setTwoInputStateKey(Integer.valueOf(twoInputTransformation.getId()), twoInputTransformation.getStateKeySelector1(), twoInputTransformation.getStateKeySelector2(), twoInputTransformation.getStateKeyType().createSerializer(this.executionConfig));
        }
        this.streamGraph.setParallelism(Integer.valueOf(twoInputTransformation.getId()), twoInputTransformation.getParallelism() != -1 ? twoInputTransformation.getParallelism() : this.executionConfig.getParallelism());
        this.streamGraph.setMaxParallelism(twoInputTransformation.getId(), twoInputTransformation.getMaxParallelism());
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addEdge(it.next(), Integer.valueOf(twoInputTransformation.getId()), 1);
        }
        Iterator<Integer> it2 = transform2.iterator();
        while (it2.hasNext()) {
            this.streamGraph.addEdge(it2.next(), Integer.valueOf(twoInputTransformation.getId()), 2);
        }
        return Collections.singleton(Integer.valueOf(twoInputTransformation.getId()));
    }

    private <OUT> Collection<Integer> transformMultipleInputTransform(AbstractMultipleInputTransformation<OUT> abstractMultipleInputTransformation) {
        Preconditions.checkArgument(!abstractMultipleInputTransformation.getInputs().isEmpty(), "Empty inputs for MultipleInputTransformation. Did you forget to add inputs?");
        MultipleInputSelectionHandler.checkSupportedInputCount(abstractMultipleInputTransformation.getInputs().size());
        ArrayList arrayList = new ArrayList();
        Iterator<Transformation<?>> it = abstractMultipleInputTransformation.getInputs().iterator();
        while (it.hasNext()) {
            arrayList.add(transform(it.next()));
        }
        if (this.alreadyTransformed.containsKey(abstractMultipleInputTransformation)) {
            return this.alreadyTransformed.get(abstractMultipleInputTransformation);
        }
        this.streamGraph.addMultipleInputOperator(Integer.valueOf(abstractMultipleInputTransformation.getId()), determineSlotSharingGroup(abstractMultipleInputTransformation.getSlotSharingGroup(), (Collection) arrayList.stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())), abstractMultipleInputTransformation.getCoLocationGroupKey(), abstractMultipleInputTransformation.getOperatorFactory(), abstractMultipleInputTransformation.getInputTypes(), abstractMultipleInputTransformation.getOutputType(), abstractMultipleInputTransformation.getName());
        this.streamGraph.setParallelism(Integer.valueOf(abstractMultipleInputTransformation.getId()), abstractMultipleInputTransformation.getParallelism() != -1 ? abstractMultipleInputTransformation.getParallelism() : this.executionConfig.getParallelism());
        this.streamGraph.setMaxParallelism(abstractMultipleInputTransformation.getId(), abstractMultipleInputTransformation.getMaxParallelism());
        if (abstractMultipleInputTransformation instanceof KeyedMultipleInputTransformation) {
            KeyedMultipleInputTransformation keyedMultipleInputTransformation = (KeyedMultipleInputTransformation) abstractMultipleInputTransformation;
            this.streamGraph.setMultipleInputStateKey(Integer.valueOf(abstractMultipleInputTransformation.getId()), keyedMultipleInputTransformation.getStateKeySelectors(), keyedMultipleInputTransformation.getStateKeyType().createSerializer(this.executionConfig));
        }
        for (int i = 0; i < arrayList.size(); i++) {
            Iterator it2 = ((Collection) arrayList.get(i)).iterator();
            while (it2.hasNext()) {
                this.streamGraph.addEdge((Integer) it2.next(), Integer.valueOf(abstractMultipleInputTransformation.getId()), i + 1);
            }
        }
        return Collections.singleton(Integer.valueOf(abstractMultipleInputTransformation.getId()));
    }

    private String determineSlotSharingGroup(String str, Collection<Integer> collection) {
        if (str != null) {
            return str;
        }
        String str2 = null;
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            String slotSharingGroup = this.streamGraph.getSlotSharingGroup(Integer.valueOf(it.next().intValue()));
            if (str2 == null) {
                str2 = slotSharingGroup;
            } else if (!str2.equals(slotSharingGroup)) {
                return DEFAULT_SLOT_SHARING_GROUP;
            }
        }
        return str2 == null ? DEFAULT_SLOT_SHARING_GROUP : str2;
    }

    private <T> void validateSplitTransformation(Transformation<T> transformation) {
        if ((transformation instanceof SelectTransformation) || (transformation instanceof SplitTransformation)) {
            throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
        }
        if (transformation instanceof SideOutputTransformation) {
            throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
        }
        if (transformation instanceof UnionTransformation) {
            Iterator<Transformation<T>> it = ((UnionTransformation) transformation).getInputs().iterator();
            while (it.hasNext()) {
                validateSplitTransformation(it.next());
            }
        } else if (transformation instanceof PartitionTransformation) {
            validateSplitTransformation(((PartitionTransformation) transformation).getInput());
        }
    }
}
