/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.InteractionsCountingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class ExecutionGraphSuspendTest
extends TestLogger {
    @Test
    public void testSuspendedOutOfCreated() throws Exception {
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway();
        int parallelism = 10;
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        scheduler.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.CANCELED);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 0);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    public void testSuspendedOutOfDeploying() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
        scheduler.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    public void testSuspendedOutOfRunning() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.RUNNING);
        scheduler.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    public void testSuspendedOutOfFailing() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.handleGlobalFailure((Throwable)new Exception("fail global"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        scheduler.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    public void testSuspendedOutOfFailed() throws Exception {
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway();
        int parallelism = 10;
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.handleGlobalFailure((Throwable)new Exception("fail global"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        scheduler.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
    }

    @Test
    public void testSuspendedOutOfCanceling() throws Exception {
        int parallelism = 10;
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway(10);
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        scheduler.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(scheduler, gateway);
    }

    @Test
    public void testSuspendedOutOfCanceled() throws Exception {
        InteractionsCountingTaskManagerGateway gateway = new InteractionsCountingTaskManagerGateway();
        int parallelism = 10;
        SchedulerBase scheduler = ExecutionGraphSuspendTest.createScheduler(gateway, 10);
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.CANCELED, eg.getTerminationFuture().get());
        scheduler.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(new JobGraph(new JobVertex[0])).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor(taskRestartExecutor).build();
        scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        scheduler.startScheduling();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        scheduler.handleGlobalFailure((Throwable)new Exception("test"));
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
        Exception exception = new Exception("Suspended");
        scheduler.suspend((Throwable)exception);
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        Assert.assertEquals((Object)exception, (Object)eg.getFailureCause());
        taskRestartExecutor.triggerScheduledTasks();
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
    }

    private static void ensureCannotLeaveSuspendedState(SchedulerBase scheduler, InteractionsCountingTaskManagerGateway gateway) {
        ExecutionGraph eg = scheduler.getExecutionGraph();
        gateway.waitUntilAllTasksAreSubmitted();
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        gateway.resetCounts();
        scheduler.handleGlobalFailure((Throwable)new Exception("fail"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateNoInteractions(gateway);
        scheduler.cancel();
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateNoInteractions(gateway);
        scheduler.suspend((Throwable)new Exception("suspend again"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateNoInteractions(gateway);
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            Assert.assertEquals((long)0L, (long)ev.getCurrentExecutionAttempt().getAttemptNumber());
        }
    }

    private static void validateNoInteractions(InteractionsCountingTaskManagerGateway gateway) {
        Assert.assertThat((Object)gateway.getInteractionsCount(), (Matcher)Matchers.is((Object)0));
    }

    private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) {
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            Assert.assertEquals((Object)expected, (Object)ev.getCurrentExecutionAttempt().getState());
        }
    }

    private static void validateCancelRpcCalls(InteractionsCountingTaskManagerGateway gateway, int num) {
        Assert.assertThat((Object)gateway.getCancelTaskCount(), (Matcher)Matchers.is((Object)num));
    }

    private static SchedulerBase createScheduler(TaskManagerGateway gateway, int parallelism) throws Exception {
        JobVertex vertex = new JobVertex("vertex");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(parallelism, gateway);
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(new JobGraph(new JobVertex[]{vertex}), slotProvider);
        scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return scheduler;
    }
}

