package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.streaming.api.graph.AdaptiveGraphManager;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.class */
public class DefaultAdaptiveExecutionHandler implements AdaptiveExecutionHandler {
    private final Logger log = LoggerFactory.getLogger(DefaultAdaptiveExecutionHandler.class);
    private final List<JobGraphUpdateListener> jobGraphUpdateListeners = new ArrayList();
    private final AdaptiveGraphManager adaptiveGraphManager;
    private final StreamGraphOptimizer streamGraphOptimizer;

    public DefaultAdaptiveExecutionHandler(ClassLoader classLoader, StreamGraph streamGraph, Executor executor) throws DynamicCodeLoadingException {
        this.adaptiveGraphManager = new AdaptiveGraphManager(classLoader, streamGraph, executor);
        this.streamGraphOptimizer = new StreamGraphOptimizer(streamGraph.getJobConfiguration(), classLoader);
        this.streamGraphOptimizer.initializeStrategies(this.adaptiveGraphManager.getStreamGraphContext());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler
    public JobGraph getJobGraph() {
        return this.adaptiveGraphManager.getJobGraph();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler
    public void handleJobEvent(JobEvent jobEvent) {
        try {
            tryOptimizeStreamGraph(jobEvent);
            tryUpdateJobGraph(jobEvent);
        } catch (Exception e) {
            this.log.error("Failed to handle job event {}.", jobEvent, e);
            throw new RuntimeException(e);
        }
    }

    private void tryOptimizeStreamGraph(JobEvent jobEvent) throws Exception {
        if (!(jobEvent instanceof ExecutionJobVertexFinishedEvent)) {
            throw new IllegalArgumentException("Unsupported job event " + jobEvent);
        }
        ExecutionJobVertexFinishedEvent executionJobVertexFinishedEvent = (ExecutionJobVertexFinishedEvent) jobEvent;
        JobVertexID vertexId = executionJobVertexFinishedEvent.getVertexId();
        Map map = (Map) executionJobVertexFinishedEvent.getResultInfo().entrySet().stream().collect(Collectors.toMap(entry -> {
            return this.adaptiveGraphManager.getProducerStreamNodeId((IntermediateDataSetID) entry.getKey());
        }, entry2 -> {
            return new ArrayList(Collections.singletonList((BlockingResultInfo) entry2.getValue()));
        }, (list, list2) -> {
            list.addAll(list2);
            return list;
        }));
        List<Integer> streamNodeIdsByJobVertexId = this.adaptiveGraphManager.getStreamNodeIdsByJobVertexId(vertexId);
        OperatorsFinished operatorsFinished = new OperatorsFinished(streamNodeIdsByJobVertexId, map);
        this.adaptiveGraphManager.addFinishedStreamNodeIds(streamNodeIdsByJobVertexId);
        this.streamGraphOptimizer.onOperatorsFinished(operatorsFinished, this.adaptiveGraphManager.getStreamGraphContext());
    }

    private void tryUpdateJobGraph(JobEvent jobEvent) throws Exception {
        if (jobEvent instanceof ExecutionJobVertexFinishedEvent) {
            List<JobVertex> onJobVertexFinished = this.adaptiveGraphManager.onJobVertexFinished(((ExecutionJobVertexFinishedEvent) jobEvent).getVertexId());
            if (onJobVertexFinished.isEmpty()) {
                return;
            }
            notifyJobGraphUpdated(onJobVertexFinished, this.adaptiveGraphManager.getPendingOperatorsCount());
        }
    }

    private void notifyJobGraphUpdated(List<JobVertex> list, int i) throws Exception {
        Iterator<JobGraphUpdateListener> it = this.jobGraphUpdateListeners.iterator();
        while (it.hasNext()) {
            it.next().onNewJobVerticesAdded(list, i);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler
    public void registerJobGraphUpdateListener(JobGraphUpdateListener jobGraphUpdateListener) {
        this.jobGraphUpdateListeners.add(jobGraphUpdateListener);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler
    public int getInitialParallelism(JobVertexID jobVertexID) {
        JobVertex findVertexByID = this.adaptiveGraphManager.getJobGraph().findVertexByID(jobVertexID);
        int parallelism = findVertexByID.getParallelism();
        StreamNodeForwardGroup streamNodeForwardGroupByVertexId = this.adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexID);
        if (findVertexByID.getParallelism() == -1 && streamNodeForwardGroupByVertexId != null && streamNodeForwardGroupByVertexId.isParallelismDecided()) {
            parallelism = streamNodeForwardGroupByVertexId.getParallelism();
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", new Object[]{findVertexByID.getName(), jobVertexID, Integer.valueOf(parallelism)});
        }
        return parallelism;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler
    public void notifyJobVertexParallelismDecided(JobVertexID jobVertexID, int i) {
        StreamNodeForwardGroup streamNodeForwardGroupByVertexId = this.adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexID);
        if (streamNodeForwardGroupByVertexId != null && !streamNodeForwardGroupByVertexId.isParallelismDecided()) {
            streamNodeForwardGroupByVertexId.setParallelism(i);
        } else if (streamNodeForwardGroupByVertexId != null) {
            Preconditions.checkArgument(streamNodeForwardGroupByVertexId.getParallelism() == i, "Incompatible parallelism for forward group.");
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler
    public ExecutionPlanSchedulingContext createExecutionPlanSchedulingContext(int i) {
        return new AdaptiveExecutionPlanSchedulingContext(this.adaptiveGraphManager, i);
    }
}
