/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.attribute.Attribute;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalVertex;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphHasher;
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
import org.apache.flink.streaming.api.graph.StreamGraphUserHashHasher;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.util.ChainedOperatorHashInfo;
import org.apache.flink.streaming.api.graph.util.ChainedSourceInfo;
import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
import org.apache.flink.streaming.api.graph.util.OperatorInfo;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private final ClassLoader userClassloader;
    private final StreamGraph streamGraph;
    private final JobGraph jobGraph;
    private final StreamGraphHasher defaultStreamGraphHasher;
    private final List<StreamGraphHasher> legacyStreamGraphHashers;
    private final Executor serializationExecutor;
    private final JobVertexBuildContext jobVertexBuildContext;

    @VisibleForTesting
    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return new StreamingJobGraphGenerator(Thread.currentThread().getContextClassLoader(), streamGraph, streamGraph.getJobID(), Runnable::run).createJobGraph();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static JobGraph createJobGraph(ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) {
        ExecutorService serializationExecutor = Executors.newFixedThreadPool(Math.max(1, Math.min(Hardware.getNumberCPUCores(), streamGraph.getExecutionConfig().getParallelism())), (ThreadFactory)new ExecutorThreadFactory("flink-operator-serialization-io"));
        try {
            JobGraph jobGraph = new StreamingJobGraphGenerator(userClassLoader, streamGraph, jobID, serializationExecutor).createJobGraph();
            return jobGraph;
        }
        finally {
            serializationExecutor.shutdown();
        }
    }

    private StreamingJobGraphGenerator(ClassLoader userClassloader, StreamGraph streamGraph, @Nullable JobID jobID, Executor serializationExecutor) {
        this.userClassloader = userClassloader;
        this.streamGraph = streamGraph;
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
        this.serializationExecutor = (Executor)Preconditions.checkNotNull((Object)serializationExecutor);
        this.jobGraph = StreamingJobGraphGenerator.createAndInitializeJobGraph(streamGraph, jobID);
        Map<Integer, byte[]> hashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
        ArrayList<Map<Integer, byte[]>> legacyHashes = new ArrayList<Map<Integer, byte[]>>(this.legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : this.legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }
        this.jobVertexBuildContext = new JobVertexBuildContext(this.jobGraph, streamGraph, new AtomicBoolean(false), hashes, legacyHashes, new SlotSharingGroup());
    }

    private JobGraph createJobGraph() {
        StreamingJobGraphGenerator.preValidate(this.streamGraph, this.userClassloader);
        this.setChaining();
        if (this.jobGraph.isDynamic()) {
            this.setVertexParallelismsForDynamicGraphIfNecessary();
        }
        HashMap<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs = new HashMap<Integer, Map<StreamEdge, NonChainedOutput>>();
        StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs, this.jobVertexBuildContext);
        this.setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);
        StreamingJobGraphGenerator.setPhysicalEdges(this.jobVertexBuildContext);
        StreamingJobGraphGenerator.markSupportingConcurrentExecutionAttempts(this.jobVertexBuildContext);
        StreamingJobGraphGenerator.validateHybridShuffleExecuteInBatchMode(this.jobVertexBuildContext);
        StreamingJobGraphGenerator.setSlotSharingAndCoLocation(this.jobVertexBuildContext);
        StreamingJobGraphGenerator.setManagedMemoryFraction(this.jobVertexBuildContext);
        StreamingJobGraphGenerator.addVertexIndexPrefixInVertexName(this.jobVertexBuildContext, new AtomicInteger(0));
        StreamingJobGraphGenerator.setVertexDescription(this.jobVertexBuildContext);
        StreamingJobGraphGenerator.serializeOperatorCoordinatorsAndStreamConfig(this.serializationExecutor, this.jobVertexBuildContext);
        return this.jobGraph;
    }

    public static void serializeOperatorCoordinatorsAndStreamConfig(Executor serializationExecutor, JobVertexBuildContext jobVertexBuildContext) {
        try {
            FutureUtils.combineAll((Collection)jobVertexBuildContext.getChainInfosInOrder().values().stream().flatMap(chainInfo -> StreamingJobGraphGenerator.serializationOperatorConfigs(chainInfo, serializationExecutor)).collect(Collectors.toList())).get();
            StreamingJobGraphGenerator.waitForSerializationFuturesAndUpdateJobVertices(jobVertexBuildContext);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error in serialization.", (Throwable)e);
        }
    }

    private static Stream<CompletableFuture<StreamConfig>> serializationOperatorConfigs(OperatorChainInfo chainInfo, Executor serializationExecutor) {
        return chainInfo.getOperatorInfos().values().stream().map(operatorInfo -> operatorInfo.getVertexConfig().triggerSerializationAndReturnFuture(serializationExecutor));
    }

    public static OperatorID generateOperatorID(String operatorUid) {
        return new OperatorID(StreamGraphHasherV2.generateUserSpecifiedHash(operatorUid));
    }

    private static void waitForSerializationFuturesAndUpdateJobVertices(JobVertexBuildContext jobVertexBuildContext) throws ExecutionException, InterruptedException {
        for (Map.Entry<JobVertexID, List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>> futuresPerJobVertex : jobVertexBuildContext.getCoordinatorSerializationFuturesPerJobVertex().entrySet()) {
            JobVertexID jobVertexId = futuresPerJobVertex.getKey();
            JobVertex jobVertex = jobVertexBuildContext.getJobGraph().findVertexByID(jobVertexId);
            Preconditions.checkState((jobVertex != null ? 1 : 0) != 0, (String)"OperatorCoordinator providers were registered for JobVertexID '%s' but no corresponding JobVertex can be found.", (Object[])new Object[]{jobVertexId});
            ((Collection)FutureUtils.combineAll((Collection)futuresPerJobVertex.getValue()).get()).forEach(jobVertex::addOperatorCoordinator);
        }
    }

    public static void addVertexIndexPrefixInVertexName(JobVertexBuildContext jobVertexBuildContext, AtomicInteger vertexIndexId) {
        if (!jobVertexBuildContext.getStreamGraph().isVertexNameIncludeIndexPrefix()) {
            return;
        }
        Set jobVertexIds = jobVertexBuildContext.getJobVerticesInOrder().values().stream().map(JobVertex::getID).collect(Collectors.toSet());
        jobVertexBuildContext.getJobGraph().getVerticesSortedTopologicallyFromSources().forEach(vertex -> {
            if (jobVertexIds.contains(vertex.getID())) {
                vertex.setName(String.format("[vertex-%d]%s", vertexIndexId.getAndIncrement(), vertex.getName()));
            }
        });
    }

    public static void setVertexDescription(JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, JobVertex> jobVertices = jobVertexBuildContext.getJobVerticesInOrder();
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : jobVertices.entrySet()) {
            Integer headOpId = headOpAndJobVertex.getKey();
            JobVertex vertex = headOpAndJobVertex.getValue();
            StringBuilder builder = new StringBuilder();
            switch (streamGraph.getVertexDescriptionMode()) {
                case CASCADING: {
                    StreamingJobGraphGenerator.buildCascadingDescription(builder, headOpId, headOpId, jobVertexBuildContext);
                    break;
                }
                case TREE: {
                    StreamingJobGraphGenerator.buildTreeDescription(builder, headOpId, headOpId, "", true, jobVertexBuildContext);
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Description mode %s not supported", streamGraph.getVertexDescriptionMode()));
                }
            }
            vertex.setOperatorPrettyName(builder.toString());
        }
    }

    private static void buildCascadingDescription(StringBuilder builder, int headOpId, int currentOpId, JobVertexBuildContext jobVertexBuildContext) {
        boolean multiOutput;
        StreamNode node = jobVertexBuildContext.getStreamGraph().getStreamNode(currentOpId);
        builder.append(StreamingJobGraphGenerator.getDescriptionWithChainedSourcesInfo(node, jobVertexBuildContext));
        LinkedList<Integer> chainedOutput = StreamingJobGraphGenerator.getChainedOutputNodes(headOpId, node, jobVertexBuildContext);
        if (chainedOutput.isEmpty()) {
            return;
        }
        builder.append(" -> ");
        boolean bl = multiOutput = chainedOutput.size() > 1;
        if (multiOutput) {
            builder.append("(");
        }
        while (true) {
            Integer outputId = chainedOutput.pollFirst();
            StreamingJobGraphGenerator.buildCascadingDescription(builder, headOpId, outputId, jobVertexBuildContext);
            if (chainedOutput.isEmpty()) break;
            builder.append(" , ");
        }
        if (multiOutput) {
            builder.append(")");
        }
    }

    private static LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode node, JobVertexBuildContext jobVertexBuildContext) {
        LinkedList<Integer> chainedOutput = new LinkedList<Integer>();
        Map<Integer, Map<Integer, StreamConfig>> chainedConfigs = jobVertexBuildContext.getChainedConfigs();
        if (chainedConfigs.containsKey(headOpId)) {
            for (StreamEdge edge : node.getOutEdges()) {
                int targetId = edge.getTargetId();
                if (!chainedConfigs.get(headOpId).containsKey(targetId)) continue;
                chainedOutput.add(targetId);
            }
        }
        return chainedOutput;
    }

    private static void buildTreeDescription(StringBuilder builder, int headOpId, int currentOpId, String prefix, boolean isLast, JobVertexBuildContext jobVertexBuildContext) {
        Object currentNodePrefix = "";
        Object childPrefix = "";
        if (currentOpId != headOpId) {
            if (isLast) {
                currentNodePrefix = prefix + "+- ";
                childPrefix = prefix + "   ";
            } else {
                currentNodePrefix = prefix + ":- ";
                childPrefix = prefix + ":  ";
            }
        }
        StreamNode node = jobVertexBuildContext.getStreamGraph().getStreamNode(currentOpId);
        builder.append((String)currentNodePrefix);
        builder.append(StreamingJobGraphGenerator.getDescriptionWithChainedSourcesInfo(node, jobVertexBuildContext));
        builder.append("\n");
        LinkedList<Integer> chainedOutput = StreamingJobGraphGenerator.getChainedOutputNodes(headOpId, node, jobVertexBuildContext);
        while (!chainedOutput.isEmpty()) {
            Integer outputId = chainedOutput.pollFirst();
            StreamingJobGraphGenerator.buildTreeDescription(builder, headOpId, outputId, (String)childPrefix, chainedOutput.isEmpty(), jobVertexBuildContext);
        }
    }

    private static String getDescriptionWithChainedSourcesInfo(StreamNode node, JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, Map<Integer, StreamConfig>> chainedConfigs = jobVertexBuildContext.getChainedConfigs();
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        List chainedSources = !chainedConfigs.containsKey(node.getId()) ? Collections.emptyList() : node.getInEdges().stream().map(StreamEdge::getSourceId).filter(id -> streamGraph.getSourceIDs().contains(id) && ((Map)chainedConfigs.get(node.getId())).containsKey(id)).map(streamGraph::getStreamNode).collect(Collectors.toList());
        return chainedSources.isEmpty() ? node.getOperatorDescription() : String.format("%s [%s]", node.getOperatorDescription(), chainedSources.stream().map(StreamNode::getOperatorDescription).collect(Collectors.joining(", ")));
    }

    public static void preValidate(StreamGraph streamGraph, ClassLoader userClassloader) {
        CheckpointConfig checkpointConfig = streamGraph.getCheckpointConfig();
        if (checkpointConfig.isCheckpointingEnabled()) {
            if (streamGraph.isIterative() && checkpointConfig.isUnalignedCheckpointsEnabled() && !checkpointConfig.isForceUnalignedCheckpoints()) {
                throw new UnsupportedOperationException("Unaligned Checkpoints are currently not supported for iterative jobs, as rescaling would require alignment (in addition to the reduced checkpointing guarantees).\nThe user can force Unaligned Checkpoints by using 'execution.checkpointing.unaligned.forced'");
            }
            if (checkpointConfig.isUnalignedCheckpointsEnabled() && !checkpointConfig.isForceUnalignedCheckpoints() && streamGraph.getStreamNodes().stream().anyMatch(StreamingJobGraphGenerator::hasCustomPartitioner)) {
                throw new UnsupportedOperationException("Unaligned checkpoints are currently not supported for custom partitioners, as rescaling is not guaranteed to work correctly.\nThe user can force Unaligned Checkpoints by using 'execution.checkpointing.unaligned.forced'");
            }
            for (StreamNode node : streamGraph.getStreamNodes()) {
                Class<StreamOperator> operatorClass;
                StreamOperatorFactory<?> operatorFactory = node.getOperatorFactory();
                if (operatorFactory == null || !InputSelectable.class.isAssignableFrom(operatorClass = operatorFactory.getStreamOperatorClass(userClassloader))) continue;
                throw new UnsupportedOperationException("Checkpointing is currently not supported for operators that implement InputSelectable:" + operatorClass.getName());
            }
        }
    }

    private static boolean hasCustomPartitioner(StreamNode node) {
        return node.getOutEdges().stream().anyMatch(edge -> edge.getPartitioner() instanceof CustomPartitionerWrapper);
    }

    public static void setPhysicalEdges(JobVertexBuildContext jobVertexBuildContext) {
        HashMap<Integer, List> physicalInEdgesInOrder = new HashMap<Integer, List>();
        for (StreamEdge streamEdge : jobVertexBuildContext.getPhysicalEdgesInOrder()) {
            int target = streamEdge.getTargetId();
            List inEdges = physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList());
            inEdges.add(streamEdge);
        }
        for (Map.Entry entry : physicalInEdgesInOrder.entrySet()) {
            int vertex = (Integer)entry.getKey();
            List edgeList = (List)entry.getValue();
            jobVertexBuildContext.getChainInfo(vertex).getOperatorInfo(vertex).getVertexConfig().setInPhysicalEdges(edgeList);
        }
    }

    private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs() {
        HashMap<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<Integer, OperatorChainInfo>();
        for (Integer sourceNodeId : this.streamGraph.getSourceIDs()) {
            StreamNode sourceNode = this.streamGraph.getStreamNode(sourceNodeId);
            if (StreamingJobGraphGenerator.isChainableSource(sourceNode, this.streamGraph)) {
                StreamingJobGraphGenerator.createSourceChainInfo(sourceNode, chainEntryPoints, this.jobVertexBuildContext);
                continue;
            }
            chainEntryPoints.put(sourceNodeId, new OperatorChainInfo(sourceNodeId));
        }
        return chainEntryPoints;
    }

    private void setChaining() {
        Map<Integer, OperatorChainInfo> chainEntryPoints = this.buildChainedInputsAndGetHeadInputs();
        Collection initialEntryPoints = chainEntryPoints.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).map(Map.Entry::getValue).collect(Collectors.toList());
        for (OperatorChainInfo info : initialEntryPoints) {
            StreamingJobGraphGenerator.createChain(info.getStartNodeId(), 1, info, chainEntryPoints, true, this.serializationExecutor, this.jobVertexBuildContext, null);
        }
    }

    public static List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo, Map<Integer, OperatorChainInfo> chainEntryPoints, boolean canCreateNewChain, Executor serializationExecutor, JobVertexBuildContext jobVertexBuildContext, @Nullable Consumer<Integer> visitedStreamNodeConsumer) {
        Integer startNodeId = chainInfo.getStartNodeId();
        if (!jobVertexBuildContext.getJobVerticesInOrder().containsKey(startNodeId)) {
            StreamConfig config;
            boolean isNoOutputUntilEndOfInput;
            StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
            if (visitedStreamNodeConsumer != null) {
                visitedStreamNodeConsumer.accept(currentNodeId);
            }
            ArrayList<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
            Attribute currentNodeAttribute = currentNode.getAttribute();
            boolean bl = isNoOutputUntilEndOfInput = currentNode.isOutputOnlyAfterEndOfStream() || currentNodeAttribute.isNoOutputUntilEndOfInput();
            if (isNoOutputUntilEndOfInput) {
                currentNodeAttribute.setNoOutputUntilEndOfInput(true);
            }
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (StreamingJobGraphGenerator.isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                    continue;
                }
                nonChainableOutputs.add(outEdge);
            }
            for (StreamEdge chainable : chainableOutputs) {
                StreamNode targetNode = streamGraph.getStreamNode(chainable.getTargetId());
                Attribute targetNodeAttribute = targetNode.getAttribute();
                if (isNoOutputUntilEndOfInput && targetNodeAttribute != null) {
                    targetNodeAttribute.setNoOutputUntilEndOfInput(true);
                }
                transitiveOutEdges.addAll(StreamingJobGraphGenerator.createChain(chainable.getTargetId(), chainIndex + 1, chainInfo, chainEntryPoints, canCreateNewChain, serializationExecutor, jobVertexBuildContext, visitedStreamNodeConsumer));
                if (targetNodeAttribute == null || !targetNodeAttribute.isNoOutputUntilEndOfInput()) continue;
                currentNodeAttribute.setNoOutputUntilEndOfInput(true);
            }
            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                if (!canCreateNewChain) continue;
                StreamingJobGraphGenerator.createChain(nonChainable.getTargetId(), 1, chainEntryPoints.computeIfAbsent(nonChainable.getTargetId(), k -> chainInfo.newChain(nonChainable.getTargetId())), chainEntryPoints, canCreateNewChain, serializationExecutor, jobVertexBuildContext, visitedStreamNodeConsumer);
            }
            chainInfo.addChainedName(currentNodeId, StreamingJobGraphGenerator.createChainedName(currentNodeId, chainableOutputs, Optional.ofNullable(chainEntryPoints.get(currentNodeId)), chainInfo.getChainedNames(), jobVertexBuildContext));
            chainInfo.addChainedMinResources(currentNodeId, StreamingJobGraphGenerator.createChainedMinResources(currentNodeId, chainableOutputs, chainInfo, jobVertexBuildContext));
            chainInfo.addChainedPreferredResources(currentNodeId, StreamingJobGraphGenerator.createChainedPreferredResources(currentNodeId, chainableOutputs, chainInfo, jobVertexBuildContext));
            OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, streamGraph.getStreamNode(currentNodeId).getOperatorName(), jobVertexBuildContext);
            if (currentNode.getInputFormat() != null) {
                chainInfo.getOrCreateFormatContainer().addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
            if (currentNode.getOutputFormat() != null) {
                chainInfo.getOrCreateFormatContainer().addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }
            OperatorInfo operatorInfo = chainInfo.createAndGetOperatorInfo(currentNodeId, currentOperatorId);
            if (currentNodeId.equals(startNodeId)) {
                JobVertex jobVertex = jobVertexBuildContext.getJobVertex(startNodeId);
                if (jobVertex == null) {
                    jobVertex = StreamingJobGraphGenerator.createJobVertex(chainInfo, serializationExecutor, jobVertexBuildContext);
                }
                config = new StreamConfig(jobVertex.getConfiguration());
            } else {
                config = new StreamConfig(new Configuration());
            }
            StreamingJobGraphGenerator.tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs, jobVertexBuildContext);
            config.setAttribute(currentNodeAttribute);
            config.setWatermarkDeclarations(streamGraph.getSerializedWatermarkDeclarations());
            StreamingJobGraphGenerator.setOperatorConfig(currentNodeId, config, chainInfo, jobVertexBuildContext);
            StreamingJobGraphGenerator.setOperatorChainedOutputsConfig(config, chainableOutputs, jobVertexBuildContext);
            operatorInfo.addNonChainableOutputs(nonChainableOutputs);
            if (currentNodeId.equals(startNodeId)) {
                chainInfo.setTransitiveOutEdges(transitiveOutEdges);
                jobVertexBuildContext.addChainInfo(startNodeId, chainInfo);
                config.setChainStart();
                config.setChainIndex(chainIndex);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                config.setTransitiveChainedTaskConfigs(jobVertexBuildContext.getChainedConfigs().get(startNodeId));
            } else {
                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                jobVertexBuildContext.getOrCreateChainedConfig(startNodeId).put(currentNodeId, config);
            }
            config.setOperatorID(currentOperatorId);
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;
        }
        return new ArrayList<StreamEdge>();
    }

    private void setVertexParallelismsForDynamicGraphIfNecessary() {
        Map<Integer, JobVertex> jobVertices = this.jobVertexBuildContext.getJobVerticesInOrder();
        List<JobVertex> topologicalOrderVertices = IterableUtils.toStream(jobVertices.values()).collect(Collectors.toList());
        Collections.reverse(topologicalOrderVertices);
        jobVertices.forEach((startNodeId, jobVertex) -> {
            OperatorChainInfo chainInfo = this.jobVertexBuildContext.getChainInfo((Integer)startNodeId);
            if (!jobVertex.isParallelismConfigured() && this.streamGraph.isAutoParallelismEnabled()) {
                jobVertex.setParallelism(-1);
                chainInfo.getAllChainedNodes().forEach(n -> n.setParallelism(-1, false));
            }
        });
        HashMap forwardProducersByJobVertex = new HashMap();
        jobVertices.forEach((startNodeId, jobVertex) -> {
            Set forwardConsumers = this.jobVertexBuildContext.getChainInfo((Integer)startNodeId).getTransitiveOutEdges().stream().filter(edge -> edge.getPartitioner() instanceof ForwardPartitioner).map(StreamEdge::getTargetId).map(jobVertices::get).collect(Collectors.toSet());
            for (JobVertex forwardConsumer : forwardConsumers) {
                forwardProducersByJobVertex.compute(forwardConsumer, (ignored, producers) -> {
                    if (producers == null) {
                        producers = new HashSet<JobVertex>();
                    }
                    producers.add(jobVertex);
                    return producers;
                });
            }
        });
        Map<JobVertexID, JobVertexForwardGroup> forwardGroupsByJobVertexId = ForwardGroupComputeUtil.computeForwardGroups(topologicalOrderVertices, jobVertex -> forwardProducersByJobVertex.getOrDefault(jobVertex, Collections.emptySet()));
        jobVertices.forEach((startNodeId, jobVertex) -> {
            ForwardGroup forwardGroup = (ForwardGroup)forwardGroupsByJobVertexId.get(jobVertex.getID());
            if (forwardGroup != null && forwardGroup.isParallelismDecided()) {
                jobVertex.setParallelism(forwardGroup.getParallelism());
                jobVertex.setParallelismConfigured(true);
                this.jobVertexBuildContext.getChainInfo((Integer)startNodeId).getAllChainedNodes().forEach(streamNode -> streamNode.setParallelism(forwardGroup.getParallelism(), true));
            }
            if (forwardGroup != null && forwardGroup.isMaxParallelismDecided()) {
                jobVertex.setMaxParallelism(forwardGroup.getMaxParallelism());
                this.jobVertexBuildContext.getChainInfo((Integer)startNodeId).getAllChainedNodes().forEach(streamNode -> streamNode.setMaxParallelism(forwardGroup.getMaxParallelism()));
            }
        });
    }

    public static JobGraph createAndInitializeJobGraph(StreamGraph streamGraph, @Nullable JobID jobId) {
        JobGraph jobGraph = new JobGraph(jobId, streamGraph.getJobName());
        jobGraph.setJobType(streamGraph.getJobType());
        jobGraph.setDynamic(streamGraph.isDynamic());
        jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
        jobGraph.setSnapshotSettings(streamGraph.getCheckpointingSettings());
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : streamGraph.getUserArtifacts().entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }
        streamGraph.getUserJarBlobKeys().forEach(jobGraph::addUserJarBlobKey);
        jobGraph.setClasspaths(streamGraph.getClasspath());
        ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
        if (executionConfig == null) {
            jobGraph.setSerializedExecutionConfig(streamGraph.getSerializedExecutionConfig());
        } else {
            try {
                jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
            }
            catch (IOException e) {
                throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
            }
        }
        jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());
        if (!streamGraph.getJobStatusHooks().isEmpty()) {
            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
        }
        return jobGraph;
    }

    public static void createSourceChainInfo(StreamNode sourceNode, Map<Integer, OperatorChainInfo> chainEntryPoints, JobVertexBuildContext jobVertexBuildContext) {
        Integer sourceNodeId = sourceNode.getId();
        StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
        OperatorChainInfo chainInfo = chainEntryPoints.computeIfAbsent(sourceOutEdge.getTargetId(), k -> new OperatorChainInfo(sourceOutEdge.getTargetId()));
        OperatorID opId = new OperatorID(jobVertexBuildContext.getHash(sourceNodeId));
        OperatorInfo operatorInfo = chainInfo.createAndGetOperatorInfo(sourceNodeId, opId);
        StreamConfig.SourceInputConfig inputConfig = new StreamConfig.SourceInputConfig(sourceOutEdge);
        StreamConfig operatorConfig = new StreamConfig(new Configuration());
        StreamingJobGraphGenerator.setOperatorConfig(sourceNodeId, operatorConfig, chainInfo, jobVertexBuildContext);
        StreamingJobGraphGenerator.setOperatorChainedOutputsConfig(operatorConfig, Collections.emptyList(), jobVertexBuildContext);
        operatorInfo.addNonChainableOutputs(Collections.emptyList());
        operatorConfig.setChainIndex(0);
        operatorConfig.setOperatorID(opId);
        operatorConfig.setOperatorName(sourceNode.getOperatorName());
        SourceOperatorFactory sourceOpFact = (SourceOperatorFactory)Preconditions.checkNotNull(sourceNode.getOperatorFactory());
        OperatorCoordinator.Provider coord = sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
        chainInfo.addChainedSource(sourceNode, new ChainedSourceInfo(operatorConfig, inputConfig));
        chainInfo.addCoordinatorProvider(coord);
    }

    private static void checkAndReplaceReusableHybridPartitionType(NonChainedOutput reusableOutput) {
        if (reusableOutput.getPartitionType() == ResultPartitionType.HYBRID_SELECTIVE) {
            reusableOutput.setPartitionType(ResultPartitionType.HYBRID_FULL);
            LOG.info("{} result partition has been replaced by {} result partition to support partition reuse, which will reduce shuffle data writing cost.", (Object)reusableOutput.getPartitionType().name(), (Object)ResultPartitionType.HYBRID_FULL.name());
        }
    }

    public static String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs, Optional<OperatorChainInfo> operatorChainInfo, Map<Integer, String> chainedNames, JobVertexBuildContext jobVertexBuildContext) {
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        List<ChainedSourceInfo> chainedSourceInfos = operatorChainInfo.map(chainInfo -> StreamingJobGraphGenerator.getChainedSourcesByVertexId(vertexID, chainInfo, streamGraph)).orElse(Collections.emptyList());
        String operatorName = StreamingJobGraphGenerator.nameWithChainedSourcesInfo(streamGraph.getStreamNode(vertexID).getOperatorName(), chainedSourceInfos);
        if (chainedOutputs.size() > 1) {
            ArrayList<String> outputChainedNames = new ArrayList<String>();
            for (StreamEdge chainable : chainedOutputs) {
                outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
            }
            return operatorName + " -> (" + StringUtils.join(outputChainedNames, (String)", ") + ")";
        }
        if (chainedOutputs.size() == 1) {
            return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId());
        }
        return operatorName;
    }

    private static List<ChainedSourceInfo> getChainedSourcesByVertexId(Integer vertexId, OperatorChainInfo chainInfo, StreamGraph streamGraph) {
        return streamGraph.getStreamNode(vertexId).getInEdges().stream().map(inEdge -> chainInfo.getChainedSources().get(inEdge.getSourceId())).filter(Objects::nonNull).collect(Collectors.toList());
    }

    public static ResourceSpec createChainedMinResources(Integer vertexID, List<StreamEdge> chainedOutputs, OperatorChainInfo operatorChainInfo, JobVertexBuildContext jobVertexBuildContext) {
        ResourceSpec minResources = jobVertexBuildContext.getStreamGraph().getStreamNode(vertexID).getMinResources();
        for (StreamEdge chainable : chainedOutputs) {
            minResources = minResources.merge(operatorChainInfo.getChainedMinResources(chainable.getTargetId()));
        }
        return minResources;
    }

    public static ResourceSpec createChainedPreferredResources(Integer vertexID, List<StreamEdge> chainedOutputs, OperatorChainInfo operatorChainInfo, JobVertexBuildContext jobVertexBuildContext) {
        ResourceSpec preferredResources = jobVertexBuildContext.getStreamGraph().getStreamNode(vertexID).getPreferredResources();
        for (StreamEdge chainable : chainedOutputs) {
            preferredResources = preferredResources.merge(operatorChainInfo.getChainedPreferredResources(chainable.getTargetId()));
        }
        return preferredResources;
    }

    public static JobVertex createJobVertex(OperatorChainInfo chainInfo, Executor serializationExecutor, JobVertexBuildContext jobVertexBuildContext) {
        int n;
        JobVertex jobVertex;
        Integer streamNodeId = chainInfo.getStartNodeId();
        StreamNode streamNode = jobVertexBuildContext.getStreamGraph().getStreamNode(streamNodeId);
        byte[] hash = jobVertexBuildContext.getHash(streamNodeId);
        if (hash == null) {
            throw new IllegalStateException("Cannot find node hash. Did you generate them before calling this method?");
        }
        JobVertexID jobVertexId = new JobVertexID(hash);
        List<ChainedOperatorHashInfo> chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId);
        ArrayList<OperatorIDPair> operatorIDPairs = new ArrayList<OperatorIDPair>();
        if (chainedOperators != null) {
            for (ChainedOperatorHashInfo chainedOperatorHashInfo : chainedOperators) {
                OperatorID userDefinedOperatorID = chainedOperatorHashInfo.getUserDefinedOperatorId() == null ? null : new OperatorID(chainedOperatorHashInfo.getUserDefinedOperatorId());
                operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperatorHashInfo.getGeneratedOperatorId()), userDefinedOperatorID, chainedOperatorHashInfo.getStreamNode().getOperatorName(), chainedOperatorHashInfo.getStreamNode().getTransformationUID()));
            }
        }
        if (chainInfo.hasFormatContainer()) {
            jobVertex = new InputOutputFormatVertex(chainInfo.getChainedName(streamNodeId), jobVertexId, operatorIDPairs);
            chainInfo.getOrCreateFormatContainer().write(new TaskConfig(jobVertex.getConfiguration()));
        } else {
            jobVertex = new JobVertex(chainInfo.getChainedName(streamNodeId), jobVertexId, operatorIDPairs);
        }
        if (streamNode.getConsumeClusterDatasetId() != null) {
            jobVertex.addIntermediateDataSetIdToConsume(streamNode.getConsumeClusterDatasetId());
        }
        ArrayList<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>> serializationFutures = new ArrayList<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>();
        for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) {
            serializationFutures.add(CompletableFuture.supplyAsync(() -> {
                try {
                    return new SerializedValue((Object)coordinatorProvider);
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException(String.format("Coordinator Provider for node %s is not serializable.", chainInfo.getChainedName(streamNodeId)), (Throwable)e);
                }
            }, serializationExecutor));
        }
        if (!serializationFutures.isEmpty()) {
            jobVertexBuildContext.putCoordinatorSerializationFutures(jobVertexId, serializationFutures);
        }
        jobVertex.setResources(chainInfo.getChainedMinResources(streamNodeId), chainInfo.getChainedPreferredResources(streamNodeId));
        jobVertex.setInvokableClass(streamNode.getJobVertexClass());
        int n2 = streamNode.getParallelism();
        if (n2 > 0) {
            jobVertex.setParallelism(n2);
        } else {
            n = jobVertex.getParallelism();
        }
        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", (Object)n, (Object)streamNodeId);
        }
        jobVertexBuildContext.addJobVertex(streamNodeId, jobVertex);
        jobVertexBuildContext.getJobGraph().addVertex(jobVertex);
        jobVertex.setParallelismConfigured(chainInfo.getAllChainedNodes().stream().anyMatch(StreamNode::isParallelismConfigured));
        return jobVertex;
    }

    public static void setOperatorConfig(Integer vertexId, StreamConfig config, OperatorChainInfo chainInfo, JobVertexBuildContext jobVertexBuildContext) {
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        OperatorInfo operatorInfo = chainInfo.getOperatorInfo(vertexId);
        Map<Integer, ChainedSourceInfo> chainedSources = chainInfo.getChainedSources();
        StreamNode vertex = streamGraph.getStreamNode(vertexId);
        config.setVertexID(vertexId);
        List<StreamEdge> inEdges = vertex.getInEdges();
        TypeSerializer<?>[] inputSerializers = vertex.getTypeSerializersIn();
        StreamConfig.InputConfig[] inputConfigs = new StreamConfig.InputConfig[inputSerializers.length];
        int inputGateCount = 0;
        for (StreamEdge inEdge : inEdges) {
            ChainedSourceInfo chainedSource = chainedSources.get(inEdge.getSourceId());
            int inputIndex = inEdge.getTypeNumber() == 0 ? 0 : inEdge.getTypeNumber() - 1;
            Preconditions.checkState((inputIndex < inputSerializers.length ? 1 : 0) != 0, (String)"Could not find valid input serializers when creating job graph for edge: %s", (Object[])new Object[]{inEdge});
            if (chainedSource != null) {
                if (inputConfigs[inputIndex] != null) {
                    throw new IllegalStateException("Trying to union a chained source with another input.");
                }
                inputConfigs[inputIndex] = chainedSource.getInputConfig();
                jobVertexBuildContext.getOrCreateChainedConfig(vertexId).put(inEdge.getSourceId(), chainedSource.getOperatorConfig());
                continue;
            }
            if (inputConfigs[inputIndex] != null) continue;
            StreamConfig.InputRequirement inputRequirement = vertex.getInputRequirements().getOrDefault(inputIndex, StreamConfig.InputRequirement.PASS_THROUGH);
            inputConfigs[inputIndex] = new StreamConfig.NetworkInputConfig(inputSerializers[inputIndex], inputGateCount++, inputRequirement);
        }
        if (vertex.getConsumeClusterDatasetId() != null) {
            config.setNumberOfNetworkInputs(1);
            inputConfigs[0] = new StreamConfig.NetworkInputConfig(inputSerializers[0], 0);
        }
        config.setInputs(inputConfigs);
        config.setTypeSerializerOut(vertex.getTypeSerializerOut());
        config.setStreamOperatorFactory(vertex.getOperatorFactory());
        CheckpointConfig checkpointCfg = streamGraph.getCheckpointConfig();
        config.setSerializedStateBackend(streamGraph.getCheckpointingSettings().getDefaultStateBackend(), Boolean.TRUE.equals(streamGraph.getCheckpointingSettings().isStateBackendUseManagedMemory().getAsBoolean()));
        config.setSerializedCheckpointStorage(streamGraph.getCheckpointingSettings().getDefaultCheckpointStorage());
        config.setGraphContainingLoops(streamGraph.isIterative());
        config.setTimerServiceProvider(streamGraph.getTimerServiceProvider());
        config.getConfiguration().set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)streamGraph.isEnableCheckpointsAfterTasksFinish());
        for (int i = 0; i < vertex.getStatePartitioners().length; ++i) {
            config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
        }
        config.setStateKeySerializer(vertex.getStateKeySerializer());
        config.setAdditionalMetricVariables(vertex.getAdditionalMetricVariables());
        Class<? extends TaskInvokable> vertexClass = vertex.getJobVertexClass();
        if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) {
            config.setIterationId(streamGraph.getBrokerID(vertexId));
            config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexId));
        }
        operatorInfo.setVertexConfig(config);
    }

    public static void setOperatorChainedOutputsConfig(StreamConfig config, List<StreamEdge> chainableOutputs, JobVertexBuildContext jobVertexBuildContext) {
        for (StreamEdge edge : chainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(jobVertexBuildContext.getStreamGraph().getExecutionConfig().getSerializerConfig()));
        }
        config.setChainedOutputs(chainableOutputs);
    }

    private static void setOperatorNonChainedOutputsConfig(JobVertexID jobVertexId, Integer streamNodeId, StreamConfig config, List<StreamEdge> nonChainableOutputs, Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge, JobVertexBuildContext jobVertexBuildContext) {
        for (StreamEdge edge : nonChainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(jobVertexBuildContext.getStreamGraph().getExecutionConfig().getSerializerConfig()));
        }
        List<NonChainedOutput> deduplicatedOutputs = StreamingJobGraphGenerator.mayReuseNonChainedOutputs(jobVertexId, streamNodeId, nonChainableOutputs, outputsConsumedByEdge, jobVertexBuildContext);
        config.setNumberOfOutputs(deduplicatedOutputs.size());
        config.setOperatorNonChainedOutputs(deduplicatedOutputs);
    }

    private void setVertexNonChainedOutputsConfig(Integer startNodeId, StreamConfig config, List<StreamEdge> transitiveOutEdges, Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {
        LinkedHashSet<NonChainedOutput> transitiveOutputs = new LinkedHashSet<NonChainedOutput>();
        for (StreamEdge edge : transitiveOutEdges) {
            NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge);
            transitiveOutputs.add(output);
            StreamingJobGraphGenerator.connect(startNodeId, edge, output, this.jobVertexBuildContext.getJobVerticesInOrder(), this.jobVertexBuildContext).increaseNumJobEdgesToCreate();
        }
        config.setVertexNonChainedOutputs(new ArrayList<NonChainedOutput>(transitiveOutputs));
    }

    public static void setAllOperatorNonChainedOutputsConfigs(Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, JobVertexBuildContext jobVertexBuildContext) {
        jobVertexBuildContext.getChainInfosInOrder().forEach((startNodeId, chainInfo) -> {
            JobVertexID jobVertexId = jobVertexBuildContext.getJobVertex((Integer)startNodeId).getID();
            StreamingJobGraphGenerator.setOperatorNonChainedOutputsConfigs(opIntermediateOutputs, jobVertexBuildContext, chainInfo, jobVertexId);
        });
    }

    private static void setOperatorNonChainedOutputsConfigs(Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, JobVertexBuildContext jobVertexBuildContext, OperatorChainInfo chainInfo, JobVertexID jobVertexId) {
        chainInfo.getOperatorInfos().forEach((streamNodeId, operatorInfo) -> {
            Map outputsConsumedByEdge = opIntermediateOutputs.computeIfAbsent((Integer)streamNodeId, ignored -> new HashMap());
            StreamingJobGraphGenerator.setOperatorNonChainedOutputsConfig(jobVertexId, streamNodeId, operatorInfo.getVertexConfig(), operatorInfo.getNonChainableOutputs(), outputsConsumedByEdge, jobVertexBuildContext);
        });
    }

    private void setAllVertexNonChainedOutputsConfigs(Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {
        this.jobVertexBuildContext.getJobVerticesInOrder().keySet().forEach(startNodeId -> this.setVertexNonChainedOutputsConfig((Integer)startNodeId, this.jobVertexBuildContext.getChainInfo((Integer)startNodeId).getOperatorInfo((Integer)startNodeId).getVertexConfig(), this.jobVertexBuildContext.getChainInfo((Integer)startNodeId).getTransitiveOutEdges(), opIntermediateOutputs));
    }

    private static List<NonChainedOutput> mayReuseNonChainedOutputs(JobVertexID jobVertexId, int streamNodeId, List<StreamEdge> consumerEdges, Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge, JobVertexBuildContext jobVertexBuildContext) {
        if (consumerEdges.isEmpty()) {
            return new ArrayList<NonChainedOutput>();
        }
        ArrayList<NonChainedOutput> outputs = new ArrayList<NonChainedOutput>(consumerEdges.size());
        for (StreamEdge consumerEdge : consumerEdges) {
            Preconditions.checkState((streamNodeId == consumerEdge.getSourceId() ? 1 : 0) != 0, (Object)"stream node id must be the same.");
            ResultPartitionType partitionType = StreamingJobGraphGenerator.getResultPartitionType(consumerEdge, jobVertexBuildContext);
            IntermediateDataSetID dataSetId = new IntermediateDataSetID(jobVertexId, consumerEdge.getEdgeId().hashCode());
            boolean isPersistentDataSet = StreamingJobGraphGenerator.isPersistentIntermediateDataset(partitionType, consumerEdge);
            if (isPersistentDataSet) {
                partitionType = ResultPartitionType.BLOCKING_PERSISTENT;
                dataSetId = consumerEdge.getIntermediateDatasetIdToProduce();
            }
            if (partitionType.isHybridResultPartition()) {
                jobVertexBuildContext.setHasHybridResultPartition(true);
                if (consumerEdge.getPartitioner().isBroadcast() && partitionType == ResultPartitionType.HYBRID_SELECTIVE) {
                    LOG.info("{} result partition has been replaced by {} result partition to support broadcast optimization, which will reduce shuffle data writing cost.", (Object)partitionType.name(), (Object)ResultPartitionType.HYBRID_FULL.name());
                    partitionType = ResultPartitionType.HYBRID_FULL;
                }
            }
            StreamingJobGraphGenerator.createOrReuseOutput(outputs, outputsConsumedByEdge, consumerEdge, isPersistentDataSet, dataSetId, partitionType, jobVertexBuildContext.getStreamGraph());
        }
        return outputs;
    }

    private static void createOrReuseOutput(List<NonChainedOutput> outputs, Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge, StreamEdge consumerEdge, boolean isPersistentDataSet, IntermediateDataSetID dataSetId, ResultPartitionType partitionType, StreamGraph streamGraph) {
        int consumerParallelism = streamGraph.getStreamNode(consumerEdge.getTargetId()).getParallelism();
        int consumerMaxParallelism = streamGraph.getStreamNode(consumerEdge.getTargetId()).getMaxParallelism();
        NonChainedOutput reusableOutput = null;
        if (StreamingJobGraphGenerator.isPartitionTypeCanBeReuse(partitionType)) {
            for (NonChainedOutput outputCandidate : outputsConsumedByEdge.values()) {
                if (!StreamingJobGraphGenerator.allHybridOrSameReconsumablePartitionType(outputCandidate.getPartitionType(), partitionType) || consumerParallelism != outputCandidate.getConsumerParallelism() || consumerMaxParallelism != outputCandidate.getConsumerMaxParallelism() || !Objects.equals(outputCandidate.getPersistentDataSetId(), consumerEdge.getIntermediateDatasetIdToProduce()) || !Objects.equals(outputCandidate.getOutputTag(), consumerEdge.getOutputTag()) || !Objects.equals(consumerEdge.getPartitioner(), outputCandidate.getPartitioner())) continue;
                reusableOutput = outputCandidate;
                outputsConsumedByEdge.put(consumerEdge, reusableOutput);
                StreamingJobGraphGenerator.checkAndReplaceReusableHybridPartitionType(reusableOutput);
                break;
            }
        }
        if (reusableOutput == null) {
            NonChainedOutput output = new NonChainedOutput(consumerEdge.supportsUnalignedCheckpoints(), consumerEdge.getSourceId(), consumerParallelism, consumerMaxParallelism, consumerEdge.getBufferTimeout(), isPersistentDataSet, dataSetId, consumerEdge.getOutputTag(), consumerEdge.getPartitioner(), partitionType);
            outputs.add(output);
            outputsConsumedByEdge.put(consumerEdge, output);
        }
    }

    private static boolean isPartitionTypeCanBeReuse(ResultPartitionType partitionType) {
        return partitionType.isReconsumable() || partitionType.isHybridResultPartition();
    }

    private static boolean allHybridOrSameReconsumablePartitionType(ResultPartitionType partitionType1, ResultPartitionType partitionType2) {
        return partitionType1.isReconsumable() && partitionType1 == partitionType2 || partitionType1.isHybridResultPartition() && partitionType2.isHybridResultPartition();
    }

    public static void tryConvertPartitionerForDynamicGraph(List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs, JobVertexBuildContext jobVertexBuildContext) {
        StreamPartitioner<?> partitioner;
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        for (StreamEdge edge : chainableOutputs) {
            partitioner = edge.getPartitioner();
            if (!(partitioner instanceof ForwardForConsecutiveHashPartitioner) && !(partitioner instanceof ForwardForUnspecifiedPartitioner)) continue;
            Preconditions.checkState((boolean)streamGraph.isDynamic(), (Object)String.format("%s should only be used in dynamic graph.", partitioner.getClass().getSimpleName()));
            edge.setPartitioner(new ForwardPartitioner());
        }
        for (StreamEdge edge : nonChainableOutputs) {
            partitioner = edge.getPartitioner();
            if (partitioner instanceof ForwardForConsecutiveHashPartitioner) {
                Preconditions.checkState((boolean)streamGraph.isDynamic(), (Object)"ForwardForConsecutiveHashPartitioner should only be used in dynamic graph.");
                edge.setPartitioner(((ForwardForConsecutiveHashPartitioner)partitioner).getHashPartitioner());
                continue;
            }
            if (!(partitioner instanceof ForwardForUnspecifiedPartitioner)) continue;
            Preconditions.checkState((boolean)streamGraph.isDynamic(), (Object)"ForwardForUnspecifiedPartitioner should only be used in dynamic graph.");
            edge.setPartitioner(new RescalePartitioner());
        }
    }

    public static IntermediateDataSet connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output, Map<Integer, JobVertex> jobVertices, JobVertexBuildContext jobVertexBuildContext) {
        jobVertexBuildContext.addPhysicalEdgesInOrder(edge);
        Integer downStreamVertexID = edge.getTargetId();
        JobVertex headVertex = jobVertices.get(headOfChain);
        JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);
        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
        downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);
        StreamPartitioner<?> partitioner = output.getPartitioner();
        ResultPartitionType resultPartitionType = output.getPartitionType();
        StreamingJobGraphGenerator.checkBufferTimeout(resultPartitionType, edge);
        JobEdge jobEdge = partitioner.isPointwise() ? downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, resultPartitionType, output.getDataSetId(), partitioner.isBroadcast(), partitioner.getClass().equals(ForwardPartitioner.class), edge.getTypeNumber(), edge.areInterInputsKeysCorrelated(), edge.isIntraInputKeyCorrelated()) : downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType, output.getDataSetId(), partitioner.isBroadcast(), partitioner.getClass().equals(ForwardPartitioner.class), edge.getTypeNumber(), edge.areInterInputsKeysCorrelated(), edge.isIntraInputKeyCorrelated());
        jobEdge.setShipStrategyName(partitioner.toString());
        jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
        jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), headOfChain, downStreamVertexID});
        }
        return jobEdge.getSource();
    }

    private static boolean isPersistentIntermediateDataset(ResultPartitionType resultPartitionType, StreamEdge edge) {
        return resultPartitionType.isBlockingOrBlockingPersistentResultPartition() && edge.getIntermediateDatasetIdToProduce() != null;
    }

    private static void checkBufferTimeout(ResultPartitionType type, StreamEdge edge) {
        long bufferTimeout = edge.getBufferTimeout();
        if (!type.canBePipelinedConsumed() && bufferTimeout != -1L) {
            throw new UnsupportedOperationException("only canBePipelinedConsumed partition support buffer timeout " + bufferTimeout + " for src operator in edge " + edge + ". \nPlease either disable buffer timeout (via -1) or use the canBePipelinedConsumed partition.");
        }
    }

    private static ResultPartitionType getResultPartitionType(StreamEdge edge, JobVertexBuildContext jobVertexBuildContext) {
        switch (edge.getExchangeMode()) {
            case PIPELINED: {
                return ResultPartitionType.PIPELINED_BOUNDED;
            }
            case BATCH: {
                return ResultPartitionType.BLOCKING;
            }
            case HYBRID_FULL: {
                return ResultPartitionType.HYBRID_FULL;
            }
            case HYBRID_SELECTIVE: {
                return ResultPartitionType.HYBRID_SELECTIVE;
            }
            case UNDEFINED: {
                return StreamingJobGraphGenerator.determineUndefinedResultPartitionType(edge, jobVertexBuildContext);
            }
        }
        throw new UnsupportedOperationException("Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
    }

    public static ResultPartitionType determineUndefinedResultPartitionType(StreamEdge edge, JobVertexBuildContext jobVertexBuildContext) {
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        Attribute sourceNodeAttribute = streamGraph.getStreamNode(edge.getSourceId()).getAttribute();
        if (sourceNodeAttribute.isNoOutputUntilEndOfInput()) {
            edge.setBufferTimeout(-1L);
            return ResultPartitionType.BLOCKING;
        }
        StreamPartitioner<?> partitioner = edge.getPartitioner();
        switch (streamGraph.getGlobalStreamExchangeMode()) {
            case ALL_EDGES_BLOCKING: {
                return ResultPartitionType.BLOCKING;
            }
            case FORWARD_EDGES_PIPELINED: {
                if (partitioner instanceof ForwardPartitioner) {
                    return ResultPartitionType.PIPELINED_BOUNDED;
                }
                return ResultPartitionType.BLOCKING;
            }
            case POINTWISE_EDGES_PIPELINED: {
                if (partitioner.isPointwise()) {
                    return ResultPartitionType.PIPELINED_BOUNDED;
                }
                return ResultPartitionType.BLOCKING;
            }
            case ALL_EDGES_PIPELINED: {
                return ResultPartitionType.PIPELINED_BOUNDED;
            }
            case ALL_EDGES_PIPELINED_APPROXIMATE: {
                return ResultPartitionType.PIPELINED_APPROXIMATE;
            }
            case ALL_EDGES_HYBRID_FULL: {
                return ResultPartitionType.HYBRID_FULL;
            }
            case ALL_EDGES_HYBRID_SELECTIVE: {
                return ResultPartitionType.HYBRID_SELECTIVE;
            }
        }
        throw new RuntimeException("Unrecognized global data exchange mode " + streamGraph.getGlobalStreamExchangeMode());
    }

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        return StreamingJobGraphGenerator.isChainable(edge, streamGraph, false);
    }

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph, boolean allowChainWithDefaultParallelism) {
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        return downStreamVertex.getInEdges().size() == 1 && StreamingJobGraphGenerator.isChainableInput(edge, streamGraph, allowChainWithDefaultParallelism);
    }

    public static boolean isChainableSource(StreamNode streamNode, StreamGraph streamGraph) {
        if (streamNode.getOperatorFactory() == null || !(streamNode.getOperatorFactory() instanceof SourceOperatorFactory) || streamNode.getOutEdges().size() != 1) {
            return false;
        }
        StreamEdge sourceOutEdge = streamNode.getOutEdges().get(0);
        StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
        ChainingStrategy targetChainingStrategy = ((StreamOperatorFactory)Preconditions.checkNotNull(target.getOperatorFactory())).getChainingStrategy();
        return targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES && StreamingJobGraphGenerator.isChainableInput(sourceOutEdge, streamGraph, false);
    }

    private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph, boolean allowChainWithDefaultParallelism) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        if (!(streamGraph.isChainingEnabled() && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && StreamingJobGraphGenerator.areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph, allowChainWithDefaultParallelism) && StreamingJobGraphGenerator.arePartitionerAndExchangeModeChainable(edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) {
            return false;
        }
        for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
            if (inEdge == edge || inEdge.getTypeNumber() != edge.getTypeNumber()) continue;
            return false;
        }
        return true;
    }

    @VisibleForTesting
    static boolean arePartitionerAndExchangeModeChainable(StreamPartitioner<?> partitioner, StreamExchangeMode exchangeMode, boolean isDynamicGraph) {
        if (partitioner instanceof ForwardForConsecutiveHashPartitioner) {
            Preconditions.checkState((boolean)isDynamicGraph);
            return true;
        }
        return partitioner instanceof ForwardPartitioner && exchangeMode != StreamExchangeMode.BATCH;
    }

    @VisibleForTesting
    static boolean areOperatorsChainable(StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph, boolean allowChainWithDefaultParallelism) {
        boolean isChainable;
        StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
        StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
        if (downStreamOperator == null || upStreamOperator == null) {
            return false;
        }
        if (downStreamOperator instanceof YieldingOperatorFactory && StreamingJobGraphGenerator.getHeadOperator(upStreamVertex, streamGraph).isLegacySource()) {
            return false;
        }
        switch (upStreamOperator.getChainingStrategy()) {
            case NEVER: {
                isChainable = false;
                break;
            }
            case ALWAYS: 
            case HEAD: 
            case HEAD_WITH_SOURCES: {
                isChainable = true;
                break;
            }
            default: {
                throw new RuntimeException("Unknown chaining strategy: " + upStreamOperator.getChainingStrategy());
            }
        }
        switch (downStreamOperator.getChainingStrategy()) {
            case NEVER: 
            case HEAD: {
                isChainable = false;
                break;
            }
            case ALWAYS: {
                break;
            }
            case HEAD_WITH_SOURCES: {
                isChainable &= upStreamOperator instanceof SourceOperatorFactory;
                break;
            }
            default: {
                throw new RuntimeException("Unknown chaining strategy: " + downStreamOperator.getChainingStrategy());
            }
        }
        isChainable = allowChainWithDefaultParallelism ? (isChainable &= upStreamVertex.getParallelism() == downStreamVertex.getParallelism() || upStreamVertex.getParallelism() == -1 || downStreamVertex.getParallelism() == -1) : (isChainable &= upStreamVertex.getParallelism() == downStreamVertex.getParallelism());
        if (!streamGraph.isChainingOfOperatorsWithDifferentMaxParallelismEnabled()) {
            isChainable &= upStreamVertex.getMaxParallelism() == downStreamVertex.getMaxParallelism();
        }
        return isChainable;
    }

    private static StreamOperatorFactory<?> getHeadOperator(StreamNode upStreamVertex, StreamGraph streamGraph) {
        if (streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex) == null) {
            if (upStreamVertex.getInEdges().size() == 1 && StreamingJobGraphGenerator.isChainable(upStreamVertex.getInEdges().get(0), streamGraph)) {
                StreamOperatorFactory<?> headOperator = StreamingJobGraphGenerator.getHeadOperator(streamGraph.getSourceVertex(upStreamVertex.getInEdges().get(0)), streamGraph);
                streamGraph.cacheHeadOperatorForNode(upStreamVertex, headOperator);
            } else {
                Preconditions.checkNotNull(upStreamVertex.getOperatorFactory());
                streamGraph.cacheHeadOperatorForNode(upStreamVertex, upStreamVertex.getOperatorFactory());
            }
        }
        return streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex);
    }

    public static void markSupportingConcurrentExecutionAttempts(JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, JobVertex> jobVertices = jobVertexBuildContext.getJobVerticesInOrder();
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
            JobVertex jobVertex = entry.getValue();
            HashSet<Integer> vertexOperators = new HashSet<Integer>();
            vertexOperators.add(entry.getKey());
            Map<Integer, StreamConfig> vertexChainedConfigs = jobVertexBuildContext.getChainedConfigs().get(entry.getKey());
            if (vertexChainedConfigs != null) {
                vertexOperators.addAll(vertexChainedConfigs.keySet());
            }
            boolean supportConcurrentExecutionAttempts = true;
            Iterator iterator = vertexOperators.iterator();
            while (iterator.hasNext()) {
                int nodeId = (Integer)iterator.next();
                StreamNode streamNode = streamGraph.getStreamNode(nodeId);
                if (streamNode.isSupportsConcurrentExecutionAttempts()) continue;
                supportConcurrentExecutionAttempts = false;
                break;
            }
            jobVertex.setSupportsConcurrentExecutionAttempts(supportConcurrentExecutionAttempts);
        }
    }

    public static void setSlotSharingAndCoLocation(JobVertexBuildContext jobVertexBuildContext) {
        StreamingJobGraphGenerator.setSlotSharing(jobVertexBuildContext);
        StreamingJobGraphGenerator.setCoLocation(jobVertexBuildContext);
    }

    private static void setSlotSharing(JobVertexBuildContext jobVertexBuildContext) {
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        HashMap<String, SlotSharingGroup> specifiedSlotSharingGroups = new HashMap<String, SlotSharingGroup>();
        Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = StreamingJobGraphGenerator.buildVertexRegionSlotSharingGroups(jobVertexBuildContext);
        Map<Integer, JobVertex> jobVertices = jobVertexBuildContext.getJobVerticesInOrder();
        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
            SlotSharingGroup effectiveSlotSharingGroup;
            JobVertex vertex = entry.getValue();
            String slotSharingGroupKey = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
            Preconditions.checkNotNull((Object)slotSharingGroupKey, (String)"StreamNode slot sharing group must not be null");
            if (slotSharingGroupKey.equals("default")) {
                effectiveSlotSharingGroup = (SlotSharingGroup)Preconditions.checkNotNull((Object)vertexRegionSlotSharingGroups.get(vertex.getID()));
            } else {
                Preconditions.checkState((!jobVertexBuildContext.hasHybridResultPartition() ? 1 : 0) != 0, (Object)"hybrid shuffle mode currently does not support setting non-default slot sharing group.");
                effectiveSlotSharingGroup = specifiedSlotSharingGroups.computeIfAbsent(slotSharingGroupKey, k -> {
                    SlotSharingGroup ssg = new SlotSharingGroup();
                    streamGraph.getSlotSharingGroupResource((String)k).ifPresent(ssg::setResourceProfile);
                    return ssg;
                });
            }
            vertex.setSlotSharingGroup(effectiveSlotSharingGroup);
        }
    }

    public static void validateHybridShuffleExecuteInBatchMode(JobVertexBuildContext jobVertexBuildContext) {
        if (jobVertexBuildContext.hasHybridResultPartition()) {
            Preconditions.checkState((jobVertexBuildContext.getStreamGraph().getJobType() == JobType.BATCH ? 1 : 0) != 0, (String)"hybrid shuffle mode only supports batch job, please set %s to %s", (Object[])new Object[]{ExecutionOptions.RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.name()});
        }
    }

    private static Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups(JobVertexBuildContext jobVertexBuildContext) {
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        HashMap<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = new HashMap<JobVertexID, SlotSharingGroup>();
        SlotSharingGroup defaultSlotSharingGroup = jobVertexBuildContext.getDefaultSlotSharingGroup();
        streamGraph.getSlotSharingGroupResource("default").ifPresent(defaultSlotSharingGroup::setResourceProfile);
        boolean allRegionsInSameSlotSharingGroup = streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();
        Iterable<DefaultLogicalPipelinedRegion> regions = DefaultLogicalTopology.fromJobGraph(jobVertexBuildContext.getJobGraph()).getAllPipelinedRegions();
        for (DefaultLogicalPipelinedRegion region : regions) {
            SlotSharingGroup regionSlotSharingGroup;
            if (allRegionsInSameSlotSharingGroup) {
                regionSlotSharingGroup = defaultSlotSharingGroup;
            } else {
                regionSlotSharingGroup = new SlotSharingGroup();
                streamGraph.getSlotSharingGroupResource("default").ifPresent(regionSlotSharingGroup::setResourceProfile);
            }
            for (LogicalVertex logicalVertex : region.getVertices()) {
                vertexRegionSlotSharingGroups.put((JobVertexID)logicalVertex.getId(), regionSlotSharingGroup);
            }
        }
        return vertexRegionSlotSharingGroups;
    }

    private static void setCoLocation(JobVertexBuildContext jobVertexBuildContext) {
        HashMap<String, Tuple2> coLocationGroups = new HashMap<String, Tuple2>();
        Map<Integer, JobVertex> jobVertices = jobVertexBuildContext.getJobVerticesInOrder();
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
            StreamNode node = streamGraph.getStreamNode(entry.getKey());
            JobVertex vertex = entry.getValue();
            SlotSharingGroup sharingGroup = vertex.getSlotSharingGroup();
            String coLocationGroupKey = node.getCoLocationGroup();
            if (coLocationGroupKey == null) continue;
            if (sharingGroup == null) {
                throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group");
            }
            Tuple2 constraint = coLocationGroups.computeIfAbsent(coLocationGroupKey, k -> new Tuple2((Object)sharingGroup, (Object)new CoLocationGroupImpl(new JobVertex[0])));
            if (constraint.f0 != sharingGroup) {
                throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups");
            }
            vertex.updateCoLocationGroup((CoLocationGroupImpl)constraint.f1);
            ((CoLocationGroupImpl)constraint.f1).addVertex(vertex);
        }
    }

    public static void setManagedMemoryFraction(JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, JobVertex> jobVertices = jobVertexBuildContext.getJobVerticesInOrder();
        Set<SlotSharingGroup> slotSharingGroups = Collections.newSetFromMap(new IdentityHashMap());
        HashMap<JobVertexID, Integer> vertexHeadOperators = new HashMap<JobVertexID, Integer>();
        HashMap<JobVertexID, Set<Integer>> vertexOperators = new HashMap<JobVertexID, Set<Integer>>();
        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
            int headOperatorId = entry.getKey();
            JobVertex jobVertex = entry.getValue();
            SlotSharingGroup jobVertexSlotSharingGroup = jobVertex.getSlotSharingGroup();
            Preconditions.checkState((jobVertexSlotSharingGroup != null ? 1 : 0) != 0, (Object)"JobVertex slot sharing group must not be null");
            slotSharingGroups.add(jobVertexSlotSharingGroup);
            vertexHeadOperators.put(jobVertex.getID(), headOperatorId);
            HashSet<Integer> operatorIds = new HashSet<Integer>();
            operatorIds.add(headOperatorId);
            operatorIds.addAll(jobVertexBuildContext.getChainedConfigs().getOrDefault(headOperatorId, Collections.emptyMap()).keySet());
            vertexOperators.put(jobVertex.getID(), operatorIds);
        }
        for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
            StreamingJobGraphGenerator.setManagedMemoryFractionForSlotSharingGroup(slotSharingGroup, vertexHeadOperators, vertexOperators, jobVertexBuildContext);
        }
    }

    private static void setManagedMemoryFractionForSlotSharingGroup(SlotSharingGroup slotSharingGroup, Map<JobVertexID, Integer> vertexHeadOperators, Map<JobVertexID, Set<Integer>> vertexOperators, JobVertexBuildContext jobVertexBuildContext) {
        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
        Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs = jobVertexBuildContext.getChainedConfigs();
        Set jobVertexIds = slotSharingGroup.getJobVertexIds().stream().filter(vertexOperators::containsKey).collect(Collectors.toSet());
        Set groupOperatorIds = jobVertexIds.stream().flatMap(vid -> ((Set)vertexOperators.get(vid)).stream()).collect(Collectors.toSet());
        Map<ManagedMemoryUseCase, Integer> groupOperatorScopeUseCaseWeights = groupOperatorIds.stream().flatMap(oid -> streamGraph.getStreamNode((Integer)oid).getManagedMemoryOperatorScopeUseCaseWeights().entrySet().stream()).collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.summingInt(Map.Entry::getValue)));
        Set<ManagedMemoryUseCase> groupSlotScopeUseCases = groupOperatorIds.stream().flatMap(oid -> streamGraph.getStreamNode((Integer)oid).getManagedMemorySlotScopeUseCases().stream()).collect(Collectors.toSet());
        for (JobVertexID jobVertexID : jobVertexIds) {
            int headOperatorNodeId = vertexHeadOperators.get(jobVertexID);
            for (int operatorNodeId : vertexOperators.get(jobVertexID)) {
                StreamConfig operatorConfig = jobVertexBuildContext.getChainInfo(headOperatorNodeId).getOperatorInfo(operatorNodeId).getVertexConfig();
                Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = streamGraph.getStreamNode(operatorNodeId).getManagedMemoryOperatorScopeUseCaseWeights();
                Set<ManagedMemoryUseCase> slotScopeUseCases = streamGraph.getStreamNode(operatorNodeId).getManagedMemorySlotScopeUseCases();
                StreamingJobGraphGenerator.setManagedMemoryFractionForOperator(operatorScopeUseCaseWeights, slotScopeUseCases, groupOperatorScopeUseCaseWeights, groupSlotScopeUseCases, operatorConfig);
            }
            StreamConfig vertexConfig = jobVertexBuildContext.getChainInfo(headOperatorNodeId).getOperatorInfo(headOperatorNodeId).getVertexConfig();
            vertexConfig.setTransitiveChainedTaskConfigs(vertexChainedConfigs.get(headOperatorNodeId));
        }
    }

    private static void setManagedMemoryFractionForOperator(Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights, Set<ManagedMemoryUseCase> slotScopeUseCases, Map<ManagedMemoryUseCase, Integer> groupManagedMemoryWeights, Set<ManagedMemoryUseCase> groupSlotScopeUseCases, StreamConfig operatorConfig) {
        for (Map.Entry<ManagedMemoryUseCase, Integer> entry : groupManagedMemoryWeights.entrySet()) {
            ManagedMemoryUseCase useCase = entry.getKey();
            int groupWeight = entry.getValue();
            int operatorWeight = operatorScopeUseCaseWeights.getOrDefault(useCase, 0);
            operatorConfig.setManagedMemoryFractionOperatorOfUseCase(useCase, operatorWeight > 0 ? ManagedMemoryUtils.getFractionRoundedDown(operatorWeight, groupWeight) : 0.0);
        }
        Iterator<Map.Entry<ManagedMemoryUseCase, Integer>> iterator = groupSlotScopeUseCases.iterator();
        while (iterator.hasNext()) {
            ManagedMemoryUseCase useCase;
            operatorConfig.setManagedMemoryFractionOperatorOfUseCase(useCase, slotScopeUseCases.contains(useCase = (ManagedMemoryUseCase)iterator.next()) ? 1.0 : 0.0);
        }
    }

    private static String nameWithChainedSourcesInfo(String operatorName, Collection<ChainedSourceInfo> chainedSourceInfos) {
        return chainedSourceInfos.isEmpty() ? operatorName : String.format("%s [%s]", operatorName, chainedSourceInfos.stream().map(chainedSourceInfo -> chainedSourceInfo.getOperatorConfig().getOperatorName()).collect(Collectors.joining(", ")));
    }
}

