package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.graph.StreamGraphContext;
import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/AdaptiveGraphManager.class */
public class AdaptiveGraphManager implements AdaptiveGraphGenerator, StreamGraphContext.StreamGraphUpdateListener {
    private final StreamGraph streamGraph;
    private final JobGraph jobGraph;
    private final StreamGraphHasher defaultStreamGraphHasher;
    private final List<StreamGraphHasher> legacyStreamGraphHasher;
    private final Executor serializationExecutor;
    private final AtomicInteger vertexIndexId;
    private final StreamGraphContext streamGraphContext;
    private final Map<Integer, byte[]> hashes;
    private final List<Map<Integer, byte[]>> legacyHashes;
    private final Map<Integer, Integer> frozenNodeToStartNodeMap;
    private final Map<Integer, Map<StreamEdge, NonChainedOutput>> intermediateOutputsCaches;
    private final Map<IntermediateDataSetID, Integer> intermediateDataSetIdToProducerMap;
    private final Map<IntermediateDataSetID, List<StreamEdge>> intermediateDataSetIdToOutputEdgesMap;
    private final Map<String, IntermediateDataSet> consumerEdgeIdToIntermediateDataSetMap = new HashMap();
    private final Map<Integer, StreamNodeForwardGroup> steamNodeIdToForwardGroupMap;
    private final Map<Integer, OperatorChainInfo> pendingChainEntryPoints;
    private final Map<JobVertexID, Integer> jobVertexToStartNodeMap;
    private final Map<JobVertexID, List<Integer>> jobVertexToChainedStreamNodeIdsMap;
    private final Map<Integer, JobVertex> startNodeToJobVertexMap;
    private final Map<Integer, JobVertexID> streamNodeIdsToJobVertexMap;
    private final Set<JobVertexID> finishedJobVertices;
    private final Set<Integer> finishedStreamNodeIds;
    private final AtomicBoolean hasHybridResultPartition;
    private final SlotSharingGroup defaultSlotSharingGroup;
    private String streamGraphJson;

    public AdaptiveGraphManager(ClassLoader classLoader, StreamGraph streamGraph, Executor executor) {
        StreamingJobGraphGenerator.preValidate(streamGraph, classLoader);
        this.streamGraph = streamGraph;
        this.serializationExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHasher = Collections.singletonList(new StreamGraphUserHashHasher());
        this.hashes = new HashMap();
        this.legacyHashes = Collections.singletonList(new HashMap());
        this.startNodeToJobVertexMap = new LinkedHashMap();
        this.pendingChainEntryPoints = new TreeMap();
        this.frozenNodeToStartNodeMap = new HashMap();
        this.intermediateOutputsCaches = new HashMap();
        this.intermediateDataSetIdToProducerMap = new HashMap();
        this.intermediateDataSetIdToOutputEdgesMap = new HashMap();
        this.steamNodeIdToForwardGroupMap = new HashMap();
        this.vertexIndexId = new AtomicInteger(0);
        this.hasHybridResultPartition = new AtomicBoolean(false);
        this.jobVertexToStartNodeMap = new HashMap();
        this.jobVertexToChainedStreamNodeIdsMap = new HashMap();
        this.streamNodeIdsToJobVertexMap = new HashMap();
        this.finishedJobVertices = new HashSet();
        this.finishedStreamNodeIds = new HashSet();
        this.streamGraphContext = new DefaultStreamGraphContext(streamGraph, this.steamNodeIdToForwardGroupMap, this.frozenNodeToStartNodeMap, this.intermediateOutputsCaches, this.consumerEdgeIdToIntermediateDataSetMap, this.finishedStreamNodeIds, classLoader, this);
        this.jobGraph = StreamingJobGraphGenerator.createAndInitializeJobGraph(streamGraph, streamGraph.getJobID());
        this.defaultSlotSharingGroup = new SlotSharingGroup();
        initialization();
    }

    @Override // org.apache.flink.streaming.api.graph.AdaptiveGraphGenerator
    public JobGraph getJobGraph() {
        return this.jobGraph;
    }

    @Override // org.apache.flink.streaming.api.graph.AdaptiveGraphGenerator
    public StreamGraphContext getStreamGraphContext() {
        return this.streamGraphContext;
    }

    @Override // org.apache.flink.streaming.api.graph.AdaptiveGraphGenerator
    public List<JobVertex> onJobVertexFinished(JobVertexID jobVertexID) {
        this.finishedJobVertices.add(jobVertexID);
        ArrayList arrayList = new ArrayList();
        Iterator<StreamEdge> it = getOutputEdgesByVertexId(jobVertexID).iterator();
        while (it.hasNext()) {
            arrayList.add(this.streamGraph.getStreamNode(Integer.valueOf(it.next().getTargetId())));
        }
        return createJobVerticesAndUpdateGraph(arrayList);
    }

    public void addFinishedStreamNodeIds(List<Integer> list) {
        this.finishedStreamNodeIds.addAll(list);
    }

    public StreamNodeForwardGroup getStreamNodeForwardGroupByVertexId(JobVertexID jobVertexID) {
        return this.steamNodeIdToForwardGroupMap.get(this.jobVertexToStartNodeMap.get(jobVertexID));
    }

    public int getPendingOperatorsCount() {
        return this.streamGraph.getStreamNodes().size() - this.frozenNodeToStartNodeMap.size();
    }

    public List<Integer> getStreamNodeIdsByJobVertexId(JobVertexID jobVertexID) {
        return this.jobVertexToChainedStreamNodeIdsMap.get(jobVertexID);
    }

    public Integer getProducerStreamNodeId(IntermediateDataSetID intermediateDataSetID) {
        return this.intermediateDataSetIdToProducerMap.get(intermediateDataSetID);
    }

    public List<StreamEdge> getOutputStreamEdges(IntermediateDataSetID intermediateDataSetID) {
        return Collections.unmodifiableList(this.intermediateDataSetIdToOutputEdgesMap.get(intermediateDataSetID));
    }

    public Optional<JobVertexID> findVertexByStreamNodeId(int i) {
        if (!isNodeFrozen(Integer.valueOf(i))) {
            return Optional.empty();
        }
        return Optional.of(this.startNodeToJobVertexMap.get(getStartNodeId(Integer.valueOf(i))).getID());
    }

    private List<StreamEdge> getOutputEdgesByVertexId(JobVertexID jobVertexID) {
        JobVertex findVertexByID = this.jobGraph.findVertexByID(jobVertexID);
        ArrayList arrayList = new ArrayList();
        Iterator<IntermediateDataSet> it = findVertexByID.getProducedDataSets().iterator();
        while (it.hasNext()) {
            arrayList.addAll(this.intermediateDataSetIdToOutputEdgesMap.get(it.next().getId()));
        }
        return arrayList;
    }

    private void initialization() {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = this.streamGraph.getSourceIDs().iterator();
        while (it.hasNext()) {
            arrayList.add(this.streamGraph.getStreamNode(it.next()));
        }
        if (this.jobGraph.isDynamic()) {
            setVertexParallelismsForDynamicGraphIfNecessary();
        }
        createJobVerticesAndUpdateGraph(arrayList);
    }

    private List<JobVertex> createJobVerticesAndUpdateGraph(List<StreamNode> list) {
        JobVertexBuildContext jobVertexBuildContext = new JobVertexBuildContext(this.jobGraph, this.streamGraph, this.hasHybridResultPartition, this.hashes, this.legacyHashes, this.defaultSlotSharingGroup);
        createOperatorChainInfos(list, jobVertexBuildContext);
        recordCreatedJobVerticesInfo(jobVertexBuildContext);
        generateConfigForJobVertices(jobVertexBuildContext);
        generateStreamGraphJson();
        return new ArrayList(jobVertexBuildContext.getJobVerticesInOrder().values());
    }

    private void generateConfigForJobVertices(JobVertexBuildContext jobVertexBuildContext) {
        HashMap hashMap = new HashMap();
        StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs(hashMap, jobVertexBuildContext);
        setAllVertexNonChainedOutputsConfigs(hashMap, jobVertexBuildContext);
        connectToFinishedUpStreamVertex(jobVertexBuildContext);
        StreamingJobGraphGenerator.setPhysicalEdges(jobVertexBuildContext);
        StreamingJobGraphGenerator.markSupportingConcurrentExecutionAttempts(jobVertexBuildContext);
        StreamingJobGraphGenerator.validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);
        StreamingJobGraphGenerator.setSlotSharingAndCoLocation(jobVertexBuildContext);
        StreamingJobGraphGenerator.setManagedMemoryFraction(jobVertexBuildContext);
        StreamingJobGraphGenerator.addVertexIndexPrefixInVertexName(jobVertexBuildContext, this.vertexIndexId);
        StreamingJobGraphGenerator.setVertexDescription(jobVertexBuildContext);
        StreamingJobGraphGenerator.serializeOperatorCoordinatorsAndStreamConfig(this.serializationExecutor, jobVertexBuildContext);
    }

    private void setAllVertexNonChainedOutputsConfigs(Map<Integer, Map<StreamEdge, NonChainedOutput>> map, JobVertexBuildContext jobVertexBuildContext) {
        jobVertexBuildContext.getJobVerticesInOrder().keySet().forEach(num -> {
            setVertexNonChainedOutputsConfig(num, map, jobVertexBuildContext);
        });
    }

    private void setVertexNonChainedOutputsConfig(Integer num, Map<Integer, Map<StreamEdge, NonChainedOutput>> map, JobVertexBuildContext jobVertexBuildContext) {
        OperatorChainInfo chainInfo = jobVertexBuildContext.getChainInfo(num);
        StreamConfig vertexConfig = chainInfo.getOperatorInfo(num).getVertexConfig();
        List<StreamEdge> transitiveOutEdges = chainInfo.getTransitiveOutEdges();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (StreamEdge streamEdge : transitiveOutEdges) {
            NonChainedOutput nonChainedOutput = map.get(Integer.valueOf(streamEdge.getSourceId())).get(streamEdge);
            linkedHashSet.add(nonChainedOutput);
            if (jobVertexBuildContext.getJobVerticesInOrder().containsKey(Integer.valueOf(streamEdge.getTargetId()))) {
                StreamingJobGraphGenerator.connect(num, streamEdge, nonChainedOutput, this.startNodeToJobVertexMap, jobVertexBuildContext);
            } else {
                IntermediateDataSet orCreateResultDataSet = jobVertexBuildContext.getJobVerticesInOrder().get(num).getOrCreateResultDataSet(nonChainedOutput.getDataSetId(), nonChainedOutput.getPartitionType());
                orCreateResultDataSet.configure(streamEdge.getPartitioner().isPointwise() ? DistributionPattern.POINTWISE : DistributionPattern.ALL_TO_ALL, streamEdge.getPartitioner().isBroadcast(), streamEdge.getPartitioner().getClass().equals(ForwardPartitioner.class));
                orCreateResultDataSet.increaseNumJobEdgesToCreate();
                this.intermediateDataSetIdToOutputEdgesMap.computeIfAbsent(orCreateResultDataSet.getId(), intermediateDataSetID -> {
                    return new ArrayList();
                }).add(streamEdge);
                this.consumerEdgeIdToIntermediateDataSetMap.put(streamEdge.getEdgeId(), orCreateResultDataSet);
                this.intermediateOutputsCaches.computeIfAbsent(Integer.valueOf(streamEdge.getSourceId()), num2 -> {
                    return new HashMap();
                }).put(streamEdge, nonChainedOutput);
            }
            this.intermediateDataSetIdToProducerMap.put(nonChainedOutput.getDataSetId(), Integer.valueOf(streamEdge.getSourceId()));
        }
        vertexConfig.setVertexNonChainedOutputs(new ArrayList(linkedHashSet));
    }

    private void connectToFinishedUpStreamVertex(JobVertexBuildContext jobVertexBuildContext) {
        Iterator<OperatorChainInfo> it = jobVertexBuildContext.getChainInfosInOrder().values().iterator();
        while (it.hasNext()) {
            for (StreamEdge streamEdge : it.next().getTransitiveInEdges()) {
                StreamingJobGraphGenerator.connect(getStartNodeId(Integer.valueOf(streamEdge.getSourceId())), streamEdge, this.intermediateOutputsCaches.get(Integer.valueOf(streamEdge.getSourceId())).get(streamEdge), this.startNodeToJobVertexMap, jobVertexBuildContext);
            }
        }
    }

    private void recordCreatedJobVerticesInfo(JobVertexBuildContext jobVertexBuildContext) {
        for (OperatorChainInfo operatorChainInfo : jobVertexBuildContext.getChainInfosInOrder().values()) {
            JobVertex jobVertex = jobVertexBuildContext.getJobVertex(operatorChainInfo.getStartNodeId());
            this.startNodeToJobVertexMap.put(operatorChainInfo.getStartNodeId(), jobVertex);
            this.jobVertexToStartNodeMap.put(jobVertex.getID(), operatorChainInfo.getStartNodeId());
            operatorChainInfo.getAllChainedNodes().forEach(streamNode -> {
                this.frozenNodeToStartNodeMap.put(Integer.valueOf(streamNode.getId()), operatorChainInfo.getStartNodeId());
                this.jobVertexToChainedStreamNodeIdsMap.computeIfAbsent(jobVertex.getID(), jobVertexID -> {
                    return new ArrayList();
                }).add(Integer.valueOf(streamNode.getId()));
                this.streamNodeIdsToJobVertexMap.put(Integer.valueOf(streamNode.getId()), jobVertex.getID());
            });
        }
    }

    private void createOperatorChainInfos(List<StreamNode> list, JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, OperatorChainInfo> buildAndGetChainEntryPoints = buildAndGetChainEntryPoints(list, jobVertexBuildContext);
        for (OperatorChainInfo operatorChainInfo : (Collection) buildAndGetChainEntryPoints.entrySet().stream().sorted(Map.Entry.comparingByKey()).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList())) {
            StreamingJobGraphGenerator.createChain(operatorChainInfo.getStartNodeId(), 1, operatorChainInfo, buildAndGetChainEntryPoints, false, this.serializationExecutor, jobVertexBuildContext, this::generateHashesByStreamNodeId);
            for (StreamEdge streamEdge : this.streamGraph.getStreamNode(operatorChainInfo.getStartNodeId()).getInEdges()) {
                if (isNodeFrozen(Integer.valueOf(streamEdge.getSourceId()))) {
                    operatorChainInfo.addTransitiveInEdge(streamEdge);
                }
            }
        }
    }

    private Map<Integer, OperatorChainInfo> buildAndGetChainEntryPoints(List<StreamNode> list, JobVertexBuildContext jobVertexBuildContext) {
        Collection<Integer> sourceIDs = this.streamGraph.getSourceIDs();
        for (StreamNode streamNode : list) {
            int id = streamNode.getId();
            if (sourceIDs.contains(Integer.valueOf(id)) && StreamingJobGraphGenerator.isChainableSource(streamNode, this.streamGraph)) {
                generateHashesByStreamNodeId(Integer.valueOf(id));
                StreamingJobGraphGenerator.createSourceChainInfo(streamNode, this.pendingChainEntryPoints, jobVertexBuildContext);
            } else {
                this.pendingChainEntryPoints.computeIfAbsent(Integer.valueOf(id), num -> {
                    return new OperatorChainInfo(id);
                });
            }
        }
        return getChainEntryPointsReadyForJobVertex();
    }

    private Map<Integer, OperatorChainInfo> getChainEntryPointsReadyForJobVertex() {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<Integer, OperatorChainInfo>> it = this.pendingChainEntryPoints.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, OperatorChainInfo> next = it.next();
            Integer key = next.getKey();
            OperatorChainInfo value = next.getValue();
            if (isReadyToCreateJobVertex(value)) {
                hashMap.put(key, value);
                it.remove();
            }
        }
        return hashMap;
    }

    private void setVertexParallelismsForDynamicGraphIfNecessary() {
        List<StreamNode> streamNodesSortedTopologicallyFromSources = this.streamGraph.getStreamNodesSortedTopologicallyFromSources();
        streamNodesSortedTopologicallyFromSources.forEach(streamNode -> {
            streamNode.getOutEdges().forEach(this::tryConvertPartitionerForChainableEdge);
        });
        streamNodesSortedTopologicallyFromSources.forEach(streamNode2 -> {
            if (streamNode2.isParallelismConfigured() || !this.streamGraph.isAutoParallelismEnabled()) {
                return;
            }
            streamNode2.setParallelism(-1, false);
        });
        HashMap hashMap = new HashMap();
        streamNodesSortedTopologicallyFromSources.forEach(streamNode3 -> {
            Stream<R> map = streamNode3.getOutEdges().stream().filter(streamEdge -> {
                return streamEdge.getPartitioner().getClass().equals(ForwardPartitioner.class);
            }).map((v0) -> {
                return v0.getTargetId();
            });
            StreamGraph streamGraph = this.streamGraph;
            Objects.requireNonNull(streamGraph);
            Iterator it = ((Set) map.map(streamGraph::getStreamNode).collect(Collectors.toSet())).iterator();
            while (it.hasNext()) {
                hashMap.compute((StreamNode) it.next(), (streamNode3, set) -> {
                    if (set == null) {
                        set = new HashSet();
                    }
                    set.add(streamNode3);
                    return set;
                });
            }
        });
        this.steamNodeIdToForwardGroupMap.putAll(ForwardGroupComputeUtil.computeStreamNodeForwardGroup(streamNodesSortedTopologicallyFromSources, streamNode4 -> {
            return (Set) hashMap.getOrDefault(streamNode4, Collections.emptySet());
        }));
        streamNodesSortedTopologicallyFromSources.forEach(streamNode5 -> {
            StreamNodeForwardGroup streamNodeForwardGroup = this.steamNodeIdToForwardGroupMap.get(Integer.valueOf(streamNode5.getId()));
            if (streamNodeForwardGroup != null && streamNodeForwardGroup.isParallelismDecided()) {
                streamNode5.setParallelism(Integer.valueOf(streamNodeForwardGroup.getParallelism()), true);
            }
            if (streamNodeForwardGroup == null || !streamNodeForwardGroup.isMaxParallelismDecided()) {
                return;
            }
            streamNode5.setMaxParallelism(streamNodeForwardGroup.getMaxParallelism());
        });
    }

    private void tryConvertPartitionerForChainableEdge(StreamEdge streamEdge) {
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        if ((partitioner instanceof ForwardForConsecutiveHashPartitioner) || (partitioner instanceof ForwardForUnspecifiedPartitioner)) {
            if ((this.streamGraph.getSourceIDs().contains(Integer.valueOf(streamEdge.getSourceId())) && StreamingJobGraphGenerator.isChainableSource(this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getSourceId())), this.streamGraph)) || StreamingJobGraphGenerator.isChainable(streamEdge, this.streamGraph)) {
                streamEdge.setPartitioner(new ForwardPartitioner());
                if ((partitioner instanceof ForwardForConsecutiveHashPartitioner) && streamEdge.getExchangeMode() == StreamExchangeMode.BATCH) {
                    streamEdge.setExchangeMode(StreamExchangeMode.UNDEFINED);
                }
                if (partitioner instanceof ForwardForUnspecifiedPartitioner) {
                    streamEdge.setIntraInputKeyCorrelated(false);
                }
            }
        }
    }

    private void generateHashesByStreamNodeId(Integer num) {
        if (this.hashes.containsKey(num)) {
            return;
        }
        for (int i = 0; i < this.legacyStreamGraphHasher.size(); i++) {
            this.legacyStreamGraphHasher.get(i).generateHashesByStreamNodeId(num.intValue(), this.streamGraph, this.legacyHashes.get(i));
        }
        Preconditions.checkState(this.defaultStreamGraphHasher.generateHashesByStreamNodeId(num.intValue(), this.streamGraph, this.hashes), "Failed to generate hash for streamNode with ID '%s'", new Object[]{num});
    }

    private boolean isReadyToCreateJobVertex(OperatorChainInfo operatorChainInfo) {
        Integer startNodeId = operatorChainInfo.getStartNodeId();
        if (isNodeFrozen(startNodeId)) {
            return false;
        }
        Iterator<StreamEdge> it = this.streamGraph.getStreamNode(startNodeId).getInEdges().iterator();
        while (it.hasNext()) {
            Integer valueOf = Integer.valueOf(it.next().getSourceId());
            if (!this.hashes.containsKey(valueOf)) {
                return false;
            }
            if (!operatorChainInfo.getChainedSources().containsKey(valueOf)) {
                Optional<JobVertexID> findVertexByStreamNodeId = findVertexByStreamNodeId(valueOf.intValue());
                if (findVertexByStreamNodeId.isEmpty() || !this.finishedJobVertices.contains(findVertexByStreamNodeId.get())) {
                    return false;
                }
            }
        }
        return true;
    }

    private boolean isNodeFrozen(Integer num) {
        return this.frozenNodeToStartNodeMap.containsKey(num);
    }

    private Integer getStartNodeId(Integer num) {
        return this.frozenNodeToStartNodeMap.get(num);
    }

    private void generateStreamGraphJson() {
        this.streamGraphJson = JsonPlanGenerator.generateStreamGraphJson(this.streamGraph, this.streamNodeIdsToJobVertexMap);
    }

    public String getStreamGraphJson() {
        return this.streamGraphJson;
    }

    @Override // org.apache.flink.streaming.api.graph.StreamGraphContext.StreamGraphUpdateListener
    public void onStreamGraphUpdated() {
        generateStreamGraphJson();
    }
}
