package org.apache.flink.runtime.scheduler;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.failure.DefaultFailureEnricherContext;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.ExecutionDeployer;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultScheduler.class */
public class DefaultScheduler extends SchedulerBase implements SchedulerOperations {
    protected final Logger log;
    private final ClassLoader userCodeLoader;
    protected final ExecutionSlotAllocator executionSlotAllocator;
    private final ExecutionFailureHandler executionFailureHandler;
    private final ScheduledExecutor delayExecutor;
    protected final SchedulingStrategy schedulingStrategy;
    private final ExecutionOperations executionOperations;
    private final Set<ExecutionVertexID> verticesWaitingForRestart;
    protected final ShuffleMaster<?> shuffleMaster;
    private final Map<AllocationID, Long> reservedAllocationRefCounters;
    private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex;
    protected final ExecutionDeployer executionDeployer;
    protected final FailoverStrategy failoverStrategy;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultScheduler$DefaultExecutionSlotAllocationContext.class */
    private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {
        private DefaultExecutionSlotAllocationContext() {
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public ResourceProfile getResourceProfile(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexID).getResourceProfile();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Optional<AllocationID> findPriorAllocationId(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexID).findLastAllocation();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public SchedulingTopology getSchedulingTopology() {
            return DefaultScheduler.this.getSchedulingTopology();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
            return DefaultScheduler.this.getJobGraph().getSlotSharingGroups();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<CoLocationGroup> getCoLocationGroups() {
            return DefaultScheduler.this.getJobGraph().getCoLocationGroups();
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Collection<ConsumedPartitionGroup> getConsumedPartitionGroups(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Collection<ExecutionVertexID> getProducersOfConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup) {
            return DefaultScheduler.this.inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(consumedPartitionGroup);
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.inputsLocationsRetriever.getTaskManagerLocation(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.StateLocationRetriever
        public Optional<TaskManagerLocation> getStateLocation(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.stateLocationRetriever.getStateLocation(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<AllocationID> getReservedAllocations() {
            return DefaultScheduler.this.reservedAllocationRefCounters.keySet();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DefaultScheduler(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, Consumer<ComponentMainThreadExecutor> consumer, ScheduledExecutor scheduledExecutor, ClassLoader classLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, Collection<FailureEnricher> collection, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time time, VertexParallelismStore vertexParallelismStore, ExecutionDeployer.Factory factory2) throws Exception {
        super(logger, jobGraph, executor, configuration, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, executionVertexVersioner, j, componentMainThreadExecutor, jobStatusListener, executionGraphFactory, vertexParallelismStore);
        this.log = logger;
        this.delayExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
        this.userCodeLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.executionOperations = (ExecutionOperations) Preconditions.checkNotNull(executionOperations);
        this.shuffleMaster = (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster);
        this.reservedAllocationRefCounters = new HashMap();
        this.reservedAllocationByExecutionVertex = new HashMap();
        this.failoverStrategy = factory.create(getSchedulingTopology(), getResultPartitionAvailabilityChecker());
        logger.info("Using failover strategy {} for {} ({}).", new Object[]{this.failoverStrategy, jobGraph.getName(), jobGraph.getJobID()});
        this.executionFailureHandler = new ExecutionFailureHandler(configuration, getSchedulingTopology(), this.failoverStrategy, restartBackoffTimeStrategy, componentMainThreadExecutor, collection, DefaultFailureEnricherContext.forTaskFailure(this.jobInfo, jobManagerJobMetricGroup, executor, classLoader), DefaultFailureEnricherContext.forGlobalFailure(this.jobInfo, jobManagerJobMetricGroup, executor, classLoader), jobManagerJobMetricGroup);
        this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
        this.executionSlotAllocator = ((ExecutionSlotAllocatorFactory) Preconditions.checkNotNull(executionSlotAllocatorFactory)).createInstance(new DefaultExecutionSlotAllocationContext());
        this.verticesWaitingForRestart = new HashSet();
        consumer.accept(componentMainThreadExecutor);
        this.executionDeployer = factory2.createInstance(logger, this.executionSlotAllocator, executionOperations, executionVertexVersioner, time, this::startReserveAllocation, componentMainThreadExecutor);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected long getNumberOfRestarts() {
        return this.executionFailureHandler.getNumberOfRestarts();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected void cancelAllPendingSlotRequestsInternal() {
        getSchedulingTopology().getVertices().forEach(schedulingExecutionVertex -> {
            cancelAllPendingSlotRequestsForVertex(schedulingExecutionVertex.getId());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void startSchedulingInternal() {
        this.log.info("Starting scheduling with scheduling strategy [{}]", this.schedulingStrategy.getClass().getName());
        transitionToRunning();
        this.schedulingStrategy.startScheduling();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFinished(Execution execution, IOMetrics iOMetrics) {
        Preconditions.checkState(execution.getState() == ExecutionState.FINISHED);
        ExecutionVertexID id = execution.getVertex().getID();
        stopReserveAllocation(id);
        this.schedulingStrategy.onExecutionStateChange(id, ExecutionState.FINISHED);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFailed(Execution execution) {
        Preconditions.checkState(execution.getState() == ExecutionState.FAILED);
        Preconditions.checkState(execution.getFailureInfo().isPresent());
        handleTaskFailure(execution, maybeTranslateToClusterDatasetException(execution.getFailureInfo().get().getException().deserializeError(this.userCodeLoader), execution.getVertex().getID()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleTaskFailure(Execution execution, @Nullable Throwable th) {
        maybeRestartTasks(recordTaskFailure(execution, th));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FailureHandlingResult recordTaskFailure(Execution execution, @Nullable Throwable th) {
        long currentTimeMillis = System.currentTimeMillis();
        setGlobalFailureCause(th, currentTimeMillis);
        notifyCoordinatorsAboutTaskFailure(execution, th);
        return this.executionFailureHandler.getFailureHandlingResult(execution, th, currentTimeMillis);
    }

    private Throwable maybeTranslateToClusterDatasetException(@Nullable Throwable th, ExecutionVertexID executionVertexID) {
        if (!(th instanceof PartitionException)) {
            return th;
        }
        List<IntermediateDataSetID> intermediateDataSetIdsToConsume = getExecutionJobVertex(executionVertexID.getJobVertexId()).getJobVertex().getIntermediateDataSetIdsToConsume();
        IntermediateResultPartitionID partitionId = ((PartitionException) th).getPartitionId().getPartitionId();
        return !intermediateDataSetIdsToConsume.contains(partitionId.getIntermediateDataSetID()) ? th : new ClusterDatasetCorruptedException(th, Collections.singletonList(partitionId.getIntermediateDataSetID()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyCoordinatorsAboutTaskFailure(Execution execution, @Nullable Throwable th) {
        ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
        int parallelSubtaskIndex = execution.getParallelSubtaskIndex();
        int attemptNumber = execution.getAttemptNumber();
        jobVertex.getOperatorCoordinators().forEach(operatorCoordinatorHolder -> {
            operatorCoordinatorHolder.executionAttemptFailed(parallelSubtaskIndex, attemptNumber, th);
        });
    }

    @Override // org.apache.flink.runtime.scheduler.GlobalFailureHandler
    public void handleGlobalFailure(Throwable th) {
        long currentTimeMillis = System.currentTimeMillis();
        setGlobalFailureCause(th, currentTimeMillis);
        this.log.info("Trying to recover from a global failure.", th);
        maybeRestartTasks(this.executionFailureHandler.getGlobalFailureHandlingResult(th, currentTimeMillis));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void maybeRestartTasks(FailureHandlingResult failureHandlingResult) {
        if (failureHandlingResult.canRestart()) {
            restartTasksWithDelay(failureHandlingResult);
        } else {
            failJob(failureHandlingResult.getError(), failureHandlingResult.getTimestamp(), failureHandlingResult.getFailureLabels());
        }
    }

    private void restartTasksWithDelay(FailureHandlingResult failureHandlingResult) {
        Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
        HashSet hashSet = new HashSet(this.executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
        boolean isGlobalFailure = failureHandlingResult.isGlobalFailure();
        if (isGlobalFailure) {
            this.log.info("{} tasks will be restarted to recover from a global failure.", Integer.valueOf(verticesToRestart.size()));
        } else {
            Preconditions.checkArgument(failureHandlingResult.getFailedExecution().isPresent());
            this.log.info("{} tasks will be restarted to recover the failed task {}.", Integer.valueOf(verticesToRestart.size()), failureHandlingResult.getFailedExecution().get().getAttemptId());
        }
        addVerticesToRestartPending(verticesToRestart);
        CompletableFuture<?> cancelTasksAsync = cancelTasksAsync(verticesToRestart);
        archiveFromFailureHandlingResult(createFailureHandlingResultSnapshot(failureHandlingResult));
        this.delayExecutor.schedule(() -> {
            FutureUtils.assertNoException(cancelTasksAsync.thenRunAsync(() -> {
                restartTasks(hashSet, isGlobalFailure);
            }, (Executor) getMainThreadExecutor()));
        }, failureHandlingResult.getRestartDelayMS(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FailureHandlingResultSnapshot createFailureHandlingResultSnapshot(FailureHandlingResult failureHandlingResult) {
        return FailureHandlingResultSnapshot.create(failureHandlingResult, executionVertexID -> {
            return getExecutionVertex(executionVertexID).getCurrentExecutions();
        });
    }

    private void addVerticesToRestartPending(Set<ExecutionVertexID> set) {
        this.verticesWaitingForRestart.addAll(set);
        transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
    }

    private void removeVerticesFromRestartPending(Set<ExecutionVertexID> set) {
        this.verticesWaitingForRestart.removeAll(set);
        if (this.verticesWaitingForRestart.isEmpty()) {
            transitionExecutionGraphState(JobStatus.RESTARTING, JobStatus.RUNNING);
        }
    }

    private void restartTasks(Set<ExecutionVertexVersion> set, boolean z) {
        Set<ExecutionVertexID> unmodifiedExecutionVertices = this.executionVertexVersioner.getUnmodifiedExecutionVertices(set);
        if (unmodifiedExecutionVertices.isEmpty()) {
            return;
        }
        removeVerticesFromRestartPending(unmodifiedExecutionVertices);
        resetForNewExecutions(unmodifiedExecutionVertices);
        try {
            restoreState(unmodifiedExecutionVertices, z);
            this.schedulingStrategy.restartTasks(unmodifiedExecutionVertices);
        } catch (Throwable th) {
            handleGlobalFailure(th);
        }
    }

    private CompletableFuture<?> cancelTasksAsync(Set<ExecutionVertexID> set) {
        cancelAllPendingSlotRequestsForVertices(set);
        return FutureUtils.combineAll((List) set.stream().map(this::cancelExecutionVertex).collect(Collectors.toList()));
    }

    private CompletableFuture<?> cancelExecutionVertex(ExecutionVertexID executionVertexID) {
        return FutureUtils.combineAll((Collection) getExecutionVertex(executionVertexID).getCurrentExecutions().stream().map(this::cancelExecution).collect(Collectors.toList()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<?> cancelExecution(Execution execution) {
        notifyCoordinatorOfCancellation(execution);
        return this.executionOperations.cancel(execution);
    }

    private void cancelAllPendingSlotRequestsForVertices(Set<ExecutionVertexID> set) {
        set.forEach(this::cancelAllPendingSlotRequestsForVertex);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cancelAllPendingSlotRequestsForVertex(ExecutionVertexID executionVertexID) {
        getExecutionVertex(executionVertexID).getCurrentExecutions().forEach(execution -> {
            this.executionSlotAllocator.cancel(execution.getAttemptId());
        });
    }

    private Execution getCurrentExecutionOfVertex(ExecutionVertexID executionVertexID) {
        return getExecutionVertex(executionVertexID).getCurrentExecutionAttempt();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerOperations
    public void allocateSlotsAndDeploy(List<ExecutionVertexID> list) {
        Map<ExecutionVertexID, ExecutionVertexVersion> recordVertexModifications = this.executionVertexVersioner.recordVertexModifications(list);
        this.executionDeployer.allocateSlotsAndDeploy((List) list.stream().map(this::getCurrentExecutionOfVertex).collect(Collectors.toList()), recordVertexModifications);
    }

    private void startReserveAllocation(ExecutionVertexID executionVertexID, AllocationID allocationID) {
        stopReserveAllocation(executionVertexID);
        this.reservedAllocationByExecutionVertex.put(executionVertexID, allocationID);
        this.reservedAllocationRefCounters.compute(allocationID, (allocationID2, l) -> {
            return Long.valueOf(l == null ? 1L : l.longValue() + 1);
        });
    }

    private void stopReserveAllocation(ExecutionVertexID executionVertexID) {
        AllocationID remove = this.reservedAllocationByExecutionVertex.remove(executionVertexID);
        if (remove != null) {
            this.reservedAllocationRefCounters.compute(remove, (allocationID, l) -> {
                if (l.longValue() > 1) {
                    return Long.valueOf(l.longValue() - 1);
                }
                return null;
            });
        }
    }

    private void notifyCoordinatorOfCancellation(Execution execution) {
        ExecutionState state = execution.getState();
        if (state == ExecutionState.FAILED || state == ExecutionState.CANCELING || state == ExecutionState.CANCELED) {
            return;
        }
        notifyCoordinatorsAboutTaskFailure(execution, null);
    }
}
