package org.apache.flink.runtime.executiongraph;

import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.class */
class ExecutionGraphSuspendTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionGraphSuspendTest() {
    }

    @Test
    void testSuspendedOutOfCreated() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CREATED);
        createScheduler.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        validateAllVerticesInState(executionGraph, ExecutionState.CANCELED);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 0);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    void testSuspendedOutOfDeploying() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RUNNING);
        validateAllVerticesInState(executionGraph, ExecutionState.DEPLOYING);
        createScheduler.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    void testSuspendedOutOfRunning() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RUNNING);
        validateAllVerticesInState(executionGraph, ExecutionState.RUNNING);
        createScheduler.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    void testSuspendedOutOfFailing() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.handleGlobalFailure(new Exception("fail global"));
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        createScheduler.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    void testSuspendedOutOfFailed() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.handleGlobalFailure(new Exception("fail global"));
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(executionGraph);
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILED);
        createScheduler.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILED);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
    }

    @Test
    void testSuspendedOutOfCanceling() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.cancel();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        createScheduler.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        ensureCannotLeaveSuspendedState(createScheduler, interactionsCountingTaskManagerGateway);
    }

    @Test
    void testSuspendedOutOfCanceled() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        SchedulerBase createScheduler = createScheduler(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        createScheduler.cancel();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(executionGraph);
        FlinkAssertions.assertThatFuture(executionGraph.getTerminationFuture()).eventuallySucceeds().isEqualTo(JobStatus.CANCELED);
        createScheduler.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
    }

    @Test
    void testSuspendWhileRestarting() throws Exception {
        ScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        DefaultScheduler build = new DefaultSchedulerBuilder(JobGraphTestUtils.emptyJobGraph(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor(manuallyTriggeredScheduledExecutor).build();
        build.startScheduling();
        ExecutionGraph executionGraph = build.getExecutionGraph();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RUNNING);
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        build.handleGlobalFailure(new Exception("test"));
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(executionGraph);
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
        build.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
    }

    private static void ensureCannotLeaveSuspendedState(SchedulerBase schedulerBase, InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway) {
        ExecutionGraph executionGraph = schedulerBase.getExecutionGraph();
        interactionsCountingTaskManagerGateway.waitUntilAllTasksAreSubmitted();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        interactionsCountingTaskManagerGateway.resetCounts();
        schedulerBase.handleGlobalFailure(new Exception("fail"));
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        schedulerBase.cancel();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        schedulerBase.closeAsync();
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptNumber()).isZero();
        }
    }

    private static void validateNoInteractions(InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway) {
        Assertions.assertThat(interactionsCountingTaskManagerGateway.getInteractionsCount()).isZero();
    }

    private static void validateAllVerticesInState(ExecutionGraph executionGraph, ExecutionState executionState) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getState()).isEqualTo(executionState);
        }
    }

    private static void validateCancelRpcCalls(InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway, int i) {
        Assertions.assertThat(interactionsCountingTaskManagerGateway.getCancelTaskCount()).isEqualTo(i);
    }

    private static SchedulerBase createScheduler(TaskManagerGateway taskManagerGateway, int i) throws Exception {
        JobVertex jobVertex = new JobVertex("vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(i, taskManagerGateway))).build();
    }
}
