package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalResult;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.class */
public class AdaptiveBatchScheduler extends DefaultScheduler {
    private final DefaultLogicalTopology logicalTopology;
    private final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider;
    private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;
    private final Map<IntermediateDataSetID, BlockingResultInfo> blockingResultInfos;
    private final JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint;

    public AdaptiveBatchScheduler(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, Consumer<ComponentMainThreadExecutor> consumer, ScheduledExecutor scheduledExecutor, ClassLoader classLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time time, VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, int i, JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint) throws Exception {
        super(logger, jobGraph, executor, configuration, consumer, scheduledExecutor, classLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, factory, restartBackoffTimeStrategy, executionOperations, executionVertexVersioner, executionSlotAllocatorFactory, j, componentMainThreadExecutor, jobStatusListener, executionGraphFactory, shuffleMaster, time, computeVertexParallelismStoreForDynamicGraph(jobGraph.getVertices(), i), new DefaultExecutionDeployer.Factory());
        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
        this.vertexParallelismAndInputInfosDecider = (VertexParallelismAndInputInfosDecider) Preconditions.checkNotNull(vertexParallelismAndInputInfosDecider);
        List<JobVertex> verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        ExecutionGraph executionGraph = getExecutionGraph();
        executionGraph.getClass();
        this.forwardGroupsByJobVertexId = ForwardGroupComputeUtil.computeForwardGroups(verticesSortedTopologicallyFromSources, executionGraph::getJobVertex);
        this.blockingResultInfos = new HashMap();
        this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void startSchedulingInternal() {
        initializeVerticesIfPossible();
        super.startSchedulingInternal();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFinished(Execution execution, IOMetrics iOMetrics) {
        Preconditions.checkNotNull(iOMetrics);
        updateResultPartitionBytesMetrics(iOMetrics.getResultPartitionBytes());
        initializeVerticesIfPossible();
        super.onTaskFinished(execution, iOMetrics);
    }

    private void updateResultPartitionBytesMetrics(Map<IntermediateResultPartitionID, ResultPartitionBytes> map) {
        Preconditions.checkNotNull(map);
        map.forEach((intermediateResultPartitionID, resultPartitionBytes) -> {
            IntermediateResult intermediateResult = getExecutionGraph().getAllIntermediateResults().get(intermediateResultPartitionID.getIntermediateDataSetID());
            Preconditions.checkNotNull(intermediateResult);
            this.blockingResultInfos.compute(intermediateResult.getId(), (intermediateDataSetID, blockingResultInfo) -> {
                if (blockingResultInfo == null) {
                    blockingResultInfo = createFromIntermediateResult(intermediateResult);
                }
                blockingResultInfo.recordPartitionInfo(intermediateResultPartitionID.getPartitionNumber(), resultPartitionBytes);
                return blockingResultInfo;
            });
        });
    }

    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerOperations
    public void allocateSlotsAndDeploy(List<ExecutionVertexID> list) {
        enrichInputBytesForExecutionVertices((List) list.stream().map(this::getExecutionVertex).collect(Collectors.toList()));
        super.allocateSlotsAndDeploy(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void resetForNewExecution(ExecutionVertexID executionVertexID) {
        ExecutionVertex executionVertex = getExecutionVertex(executionVertexID);
        if (executionVertex.getExecutionState() == ExecutionState.FINISHED) {
            executionVertex.getProducedPartitions().values().forEach(intermediateResultPartition -> {
                this.blockingResultInfos.computeIfPresent(intermediateResultPartition.getIntermediateResult().getId(), (intermediateDataSetID, blockingResultInfo) -> {
                    blockingResultInfo.resetPartitionInfo(intermediateResultPartition.getPartitionNumber());
                    return blockingResultInfo;
                });
            });
        }
        super.resetForNewExecution(executionVertexID);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() {
        return resultPartitionType -> {
            return resultPartitionType.isBlockingOrBlockingPersistentResultPartition() || this.hybridPartitionDataConsumeConstraint.isOnlyConsumeFinishedPartition();
        };
    }

    void initializeVerticesIfPossible() {
        List<ExecutionJobVertex> arrayList = new ArrayList<>();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getVerticesTopologically()) {
                if (!executionJobVertex.isInitialized()) {
                    if (canInitialize(executionJobVertex)) {
                        getExecutionGraph().initializeJobVertex(executionJobVertex, currentTimeMillis);
                        arrayList.add(executionJobVertex);
                    } else {
                        Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo = tryGetConsumedResultsInfo(executionJobVertex);
                        if (tryGetConsumedResultsInfo.isPresent()) {
                            ParallelismAndInputInfos tryDecideParallelismAndInputInfos = tryDecideParallelismAndInputInfos(executionJobVertex, tryGetConsumedResultsInfo.get());
                            changeJobVertexParallelism(executionJobVertex, tryDecideParallelismAndInputInfos.getParallelism());
                            Preconditions.checkState(canInitialize(executionJobVertex));
                            getExecutionGraph().initializeJobVertex(executionJobVertex, currentTimeMillis, tryDecideParallelismAndInputInfos.getJobVertexInputInfos());
                            arrayList.add(executionJobVertex);
                        }
                    }
                }
            }
        } catch (JobException e) {
            this.log.error("Unexpected error occurred when initializing ExecutionJobVertex", e);
            failJob(e, System.currentTimeMillis());
        }
        if (arrayList.size() > 0) {
            updateTopology(arrayList);
        }
    }

    private ParallelismAndInputInfos tryDecideParallelismAndInputInfos(ExecutionJobVertex executionJobVertex, List<BlockingResultInfo> list) {
        int parallelism = executionJobVertex.getParallelism();
        ForwardGroup forwardGroup = this.forwardGroupsByJobVertexId.get(executionJobVertex.getJobVertexId());
        if (!executionJobVertex.isParallelismDecided() && forwardGroup != null && forwardGroup.isParallelismDecided()) {
            parallelism = forwardGroup.getParallelism();
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", new Object[]{executionJobVertex.getName(), executionJobVertex.getJobVertexId(), Integer.valueOf(parallelism)});
        }
        ParallelismAndInputInfos decideParallelismAndInputInfosForVertex = this.vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(executionJobVertex.getJobVertexId(), list, parallelism);
        if (parallelism == -1) {
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {}.", new Object[]{executionJobVertex.getName(), executionJobVertex.getJobVertexId(), Integer.valueOf(decideParallelismAndInputInfosForVertex.getParallelism())});
        } else {
            Preconditions.checkState(decideParallelismAndInputInfosForVertex.getParallelism() == parallelism);
        }
        if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
            forwardGroup.setParallelism(decideParallelismAndInputInfosForVertex.getParallelism());
        }
        return decideParallelismAndInputInfosForVertex;
    }

    private void enrichInputBytesForExecutionVertices(List<ExecutionVertex> list) {
        for (ExecutionVertex executionVertex : list) {
            List<IntermediateResult> inputs = executionVertex.getJobVertex().getInputs();
            boolean anyMatch = inputs.stream().anyMatch(intermediateResult -> {
                return intermediateResult.getResultType() == ResultPartitionType.HYBRID_FULL || intermediateResult.getResultType() == ResultPartitionType.HYBRID_SELECTIVE;
            });
            if (!inputs.isEmpty() && !anyMatch) {
                long j = 0;
                for (IntermediateResult intermediateResult2 : inputs) {
                    ExecutionVertexInputInfo executionVertexInputInfo = executionVertex.getExecutionVertexInputInfo(intermediateResult2.getId());
                    j += ((BlockingResultInfo) Preconditions.checkNotNull(getBlockingResultInfo(intermediateResult2.getId()))).getNumBytesProduced(executionVertexInputInfo.getPartitionIndexRange(), executionVertexInputInfo.getSubpartitionIndexRange());
                }
                executionVertex.setInputBytes(j);
            }
        }
    }

    private void changeJobVertexParallelism(ExecutionJobVertex executionJobVertex, int i) {
        if (executionJobVertex.isParallelismDecided()) {
            return;
        }
        executionJobVertex.getJobVertex().setParallelism(i);
        try {
            getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph()));
        } catch (Throwable th) {
            this.log.warn("Cannot create JSON plan for job", th);
            getExecutionGraph().setJsonPlan("{}");
        }
        executionJobVertex.setParallelism(i);
    }

    private Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo(ExecutionJobVertex executionJobVertex) {
        ArrayList arrayList = new ArrayList();
        Iterator<? extends LogicalResult> it = this.logicalTopology.getVertex(executionJobVertex.getJobVertexId()).getConsumedResults().iterator();
        while (it.hasNext()) {
            DefaultLogicalResult defaultLogicalResult = (DefaultLogicalResult) it.next();
            if (!getExecutionJobVertex(defaultLogicalResult.getProducer2().getId()).isFinished()) {
                return Optional.empty();
            }
            arrayList.add((BlockingResultInfo) Preconditions.checkNotNull(this.blockingResultInfos.get(defaultLogicalResult.getId())));
        }
        return Optional.of(arrayList);
    }

    private boolean canInitialize(ExecutionJobVertex executionJobVertex) {
        if (executionJobVertex.isInitialized() || !executionJobVertex.isParallelismDecided()) {
            return false;
        }
        Iterator<JobEdge> it = executionJobVertex.getJobVertex().getInputs().iterator();
        while (it.hasNext()) {
            ExecutionJobVertex jobVertex = getExecutionGraph().getJobVertex(it.next().getSource().getProducer().getID());
            Preconditions.checkNotNull(jobVertex);
            if (!jobVertex.isInitialized()) {
                return false;
            }
        }
        return true;
    }

    private void updateTopology(List<ExecutionJobVertex> list) {
        Iterator<ExecutionJobVertex> it = list.iterator();
        while (it.hasNext()) {
            initializeOperatorCoordinatorsFor(it.next());
        }
        getExecutionGraph().notifyNewlyInitializedJobVertices(list);
    }

    private void initializeOperatorCoordinatorsFor(ExecutionJobVertex executionJobVertex) {
        this.operatorCoordinatorHandler.registerAndStartNewCoordinators(executionJobVertex.getOperatorCoordinators(), getMainThreadExecutor());
    }

    @VisibleForTesting
    public static VertexParallelismStore computeVertexParallelismStoreForDynamicGraph(Iterable<JobVertex> iterable, int i) {
        return computeVertexParallelismStore(iterable, jobVertex -> {
            return jobVertex.getParallelism() > 0 ? Integer.valueOf(getDefaultMaxParallelism(jobVertex)) : Integer.valueOf(i);
        }, Function.identity());
    }

    private static BlockingResultInfo createFromIntermediateResult(IntermediateResult intermediateResult) {
        Preconditions.checkArgument(intermediateResult != null);
        return intermediateResult.getConsumingDistributionPattern() == DistributionPattern.POINTWISE ? new PointwiseBlockingResultInfo(intermediateResult.getId(), intermediateResult.getNumberOfAssignedPartitions(), intermediateResult.getPartitions()[0].getNumberOfSubpartitions()) : new AllToAllBlockingResultInfo(intermediateResult.getId(), intermediateResult.getNumberOfAssignedPartitions(), intermediateResult.getPartitions()[0].getNumberOfSubpartitions(), intermediateResult.isBroadcast());
    }

    @VisibleForTesting
    BlockingResultInfo getBlockingResultInfo(IntermediateDataSetID intermediateDataSetID) {
        return this.blockingResultInfos.get(intermediateDataSetID);
    }
}
