package org.apache.flink.runtime.executiongraph;

import java.util.Iterator;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.class */
public class ExecutionGraphSuspendTest extends TestLogger {
    @Test
    public void testSuspendedOutOfCreated() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        ExecutionGraph createExecutionGraph = createExecutionGraph(interactionsCountingTaskManagerGateway, 10);
        Assert.assertEquals(JobStatus.CREATED, createExecutionGraph.getState());
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.CANCELED);
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 0);
        ensureCannotLeaveSuspendedState(createExecutionGraph, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfDeploying() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        ExecutionGraph createExecutionGraph = createExecutionGraph(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.DEPLOYING);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createExecutionGraph, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfRunning() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        ExecutionGraph createExecutionGraph = createExecutionGraph(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.RUNNING);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createExecutionGraph, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfFailing() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        ExecutionGraph createExecutionGraph = createExecutionGraph(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.failGlobal(new Exception("fail global"));
        Assert.assertEquals(JobStatus.FAILING, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        ensureCannotLeaveSuspendedState(createExecutionGraph, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfFailed() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        ExecutionGraph createExecutionGraph = createExecutionGraph(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.failGlobal(new Exception("fail global"));
        Assert.assertEquals(JobStatus.FAILING, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createExecutionGraph);
        Assert.assertEquals(JobStatus.FAILED, createExecutionGraph.getState());
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.FAILED, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
    }

    @Test
    public void testSuspendedOutOfCanceling() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway(10);
        ExecutionGraph createExecutionGraph = createExecutionGraph(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        ensureCannotLeaveSuspendedState(createExecutionGraph, interactionsCountingTaskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfCanceled() throws Exception {
        InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway = new InteractionsCountingTaskManagerGateway();
        ExecutionGraph createExecutionGraph = createExecutionGraph(interactionsCountingTaskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createExecutionGraph);
        Assert.assertEquals(JobStatus.CANCELED, createExecutionGraph.getTerminationFuture().get());
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.CANCELED, createExecutionGraph.getState());
        validateCancelRpcCalls(interactionsCountingTaskManagerGateway, 10);
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
        createSimpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        createSimpleTestGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSimpleTestGraph.getState());
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createSimpleTestGraph);
        createSimpleTestGraph.failGlobal(new Exception("test"));
        Assert.assertEquals(JobStatus.FAILING, createSimpleTestGraph.getState());
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createSimpleTestGraph);
        Assert.assertEquals(JobStatus.RESTARTING, createSimpleTestGraph.getState());
        Exception exc = new Exception("Suspended");
        createSimpleTestGraph.suspend(exc);
        Assert.assertEquals(JobStatus.SUSPENDED, createSimpleTestGraph.getState());
        Assert.assertEquals(exc, createSimpleTestGraph.getFailureCause());
    }

    private static void ensureCannotLeaveSuspendedState(ExecutionGraph executionGraph, InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway) {
        interactionsCountingTaskManagerGateway.waitUntilAllTasksAreSubmitted();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        interactionsCountingTaskManagerGateway.resetCounts();
        executionGraph.failGlobal(new Exception("fail"));
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        executionGraph.suspend(new Exception("suspend again"));
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        validateNoInteractions(interactionsCountingTaskManagerGateway);
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(0L, ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptNumber());
        }
    }

    private static void validateNoInteractions(InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway) {
        Assert.assertThat(Integer.valueOf(interactionsCountingTaskManagerGateway.getInteractionsCount()), Matchers.is(0));
    }

    private static void validateAllVerticesInState(ExecutionGraph executionGraph, ExecutionState executionState) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(executionState, ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getState());
        }
    }

    private static void validateCancelRpcCalls(InteractionsCountingTaskManagerGateway interactionsCountingTaskManagerGateway, int i) {
        Assert.assertThat(Integer.valueOf(interactionsCountingTaskManagerGateway.getCancelTaskCount()), Matchers.is(Integer.valueOf(i)));
    }

    private static ExecutionGraph createExecutionGraph(TaskManagerGateway taskManagerGateway, int i) throws Exception {
        JobVertex jobVertex = new JobVertex("vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph((SlotProvider) new SimpleSlotProvider(i, taskManagerGateway), (RestartStrategy) new FixedDelayRestartStrategy(0, 0L), jobVertex);
        createSimpleTestGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return createSimpleTestGraph;
    }
}
