package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.class */
public class CreatingExecutionGraph implements State {
    private final Context context;
    private final Logger logger;
    private final OperatorCoordinatorHandlerFactory operatorCoordinatorHandlerFactory;

    @Nullable
    private final ExecutionGraph previousExecutionGraph;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph$AssignmentResult.class */
    public static final class AssignmentResult {
        private static final AssignmentResult NOT_POSSIBLE = new AssignmentResult(null);

        @Nullable
        private final ExecutionGraph executionGraph;

        private AssignmentResult(@Nullable ExecutionGraph executionGraph) {
            this.executionGraph = executionGraph;
        }

        boolean isSuccess() {
            return this.executionGraph != null;
        }

        ExecutionGraph getExecutionGraph() {
            Preconditions.checkState(isSuccess(), "Can only return the ExecutionGraph if it is a success.");
            return this.executionGraph;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static AssignmentResult success(ExecutionGraph executionGraph) {
            return new AssignmentResult((ExecutionGraph) Preconditions.checkNotNull(executionGraph, "AssignmentResult.success expects a non-null ExecutionGraph."));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static AssignmentResult notPossible() {
            return NOT_POSSIBLE;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph$Context.class */
    public interface Context extends GlobalFailureHandler, StateTransitions.ToExecuting, StateTransitions.ToFinished, StateTransitions.ToWaitingForResources {
        ArchivedExecutionGraph getArchivedExecutionGraph(JobStatus jobStatus, @Nullable Throwable th);

        ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration);

        AssignmentResult tryToAssignSlots(ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism);

        Executor getIOExecutor();

        ComponentMainThreadExecutor getMainThreadExecutor();

        JobManagerJobMetricGroup getMetricGroup();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph$ExecutionGraphWithVertexParallelism.class */
    public static class ExecutionGraphWithVertexParallelism {
        private final ExecutionGraph executionGraph;
        private final JobSchedulingPlan jobSchedulingPlan;

        private ExecutionGraphWithVertexParallelism(ExecutionGraph executionGraph, JobSchedulingPlan jobSchedulingPlan) {
            this.executionGraph = executionGraph;
            this.jobSchedulingPlan = jobSchedulingPlan;
        }

        public static ExecutionGraphWithVertexParallelism create(ExecutionGraph executionGraph, JobSchedulingPlan jobSchedulingPlan) {
            return new ExecutionGraphWithVertexParallelism(executionGraph, jobSchedulingPlan);
        }

        public ExecutionGraph getExecutionGraph() {
            return this.executionGraph;
        }

        public VertexParallelism getVertexParallelism() {
            return this.jobSchedulingPlan.getVertexParallelism();
        }

        public JobSchedulingPlan getJobSchedulingPlan() {
            return this.jobSchedulingPlan;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph$Factory.class */
    static class Factory implements StateFactory<CreatingExecutionGraph> {
        private final Context context;
        private final CompletableFuture<ExecutionGraphWithVertexParallelism> executionGraphWithParallelismFuture;

        @Nullable
        private final ExecutionGraph previousExecutionGraph;
        private final Logger log;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Factory(Context context, CompletableFuture<ExecutionGraphWithVertexParallelism> completableFuture, Logger logger, @Nullable ExecutionGraph executionGraph) {
            this.context = context;
            this.executionGraphWithParallelismFuture = completableFuture;
            this.log = logger;
            this.previousExecutionGraph = executionGraph;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public Class<CreatingExecutionGraph> getStateClass() {
            return CreatingExecutionGraph.class;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public CreatingExecutionGraph getState() {
            return new CreatingExecutionGraph(this.context, this.executionGraphWithParallelismFuture, this.log, DefaultOperatorCoordinatorHandler::new, this.previousExecutionGraph);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph$OperatorCoordinatorHandlerFactory.class */
    public interface OperatorCoordinatorHandlerFactory {
        OperatorCoordinatorHandler create(ExecutionGraph executionGraph, GlobalFailureHandler globalFailureHandler);
    }

    public CreatingExecutionGraph(Context context, CompletableFuture<ExecutionGraphWithVertexParallelism> completableFuture, Logger logger, OperatorCoordinatorHandlerFactory operatorCoordinatorHandlerFactory, ExecutionGraph executionGraph) {
        this.context = context;
        this.logger = logger;
        this.operatorCoordinatorHandlerFactory = operatorCoordinatorHandlerFactory;
        FutureUtils.assertNoException(completableFuture.handle((executionGraphWithVertexParallelism, th) -> {
            context.runIfState(this, () -> {
                handleExecutionGraphCreation(executionGraphWithVertexParallelism, th);
            }, Duration.ZERO);
            return null;
        }));
        this.previousExecutionGraph = executionGraph;
    }

    private void handleExecutionGraphCreation(@Nullable ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism, @Nullable Throwable th) {
        if (th != null) {
            this.logger.info("Failed to go from {} to {} because the ExecutionGraph creation failed.", new Object[]{CreatingExecutionGraph.class.getSimpleName(), Executing.class.getSimpleName(), th});
            this.context.goToFinished(this.context.getArchivedExecutionGraph(JobStatus.FAILED, th));
            return;
        }
        Iterator<ExecutionVertex> it = executionGraphWithVertexParallelism.executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            it.next().getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
        }
        AssignmentResult tryToAssignSlots = this.context.tryToAssignSlots(executionGraphWithVertexParallelism);
        if (!tryToAssignSlots.isSuccess()) {
            this.logger.debug("Failed to reserve and assign the required slots. Waiting for new resources.");
            this.context.goToWaitingForResources(this.previousExecutionGraph);
            return;
        }
        this.logger.debug("Successfully reserved and assigned the required slots for the ExecutionGraph.");
        ExecutionGraph executionGraph = tryToAssignSlots.getExecutionGraph();
        ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler(executionGraph, getLogger(), this.context.getIOExecutor(), this.context.mo623getMainThreadExecutor());
        OperatorCoordinatorHandler create = this.operatorCoordinatorHandlerFactory.create(executionGraph, this.context);
        create.initializeOperatorCoordinators(this.context.mo623getMainThreadExecutor(), this.context.getMetricGroup());
        create.startAllOperatorCoordinators();
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(executionGraph.getJobID(), executionGraph.getJobName(), JobType.STREAMING, () -> {
            return IterableUtils.toStream(executionGraph.getVerticesTopologically()).map((v0) -> {
                return v0.getJobVertex();
            }).iterator();
        }, executionGraphWithVertexParallelism.getVertexParallelism()));
        this.context.goToExecuting(tryToAssignSlots.getExecutionGraph(), executionGraphHandler, create, Collections.emptyList());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void cancel() {
        this.context.goToFinished(this.context.getArchivedExecutionGraph(JobStatus.CANCELED, null));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void suspend(Throwable th) {
        this.context.goToFinished(this.context.getArchivedExecutionGraph(JobStatus.SUSPENDED, th));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public JobStatus getJobStatus() {
        return JobStatus.CREATED;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public ArchivedExecutionGraph getJob() {
        return this.context.getArchivedExecutionGraph(getJobStatus(), null);
    }

    @Override // org.apache.flink.runtime.scheduler.GlobalFailureHandler
    public void handleGlobalFailure(Throwable th) {
        this.context.goToFinished(this.context.getArchivedExecutionGraph(JobStatus.FAILED, th));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public Logger getLogger() {
        return this.logger;
    }
}
