/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ExecutionGraphRestartTest
extends TestLogger {
    private static final int NUM_TASKS = 31;
    private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    private static final JobID TEST_JOB_ID = new JobID();
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;

    @Before
    public void setUp() {
        this.taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    private void completeCanceling(ExecutionGraph eg) {
        this.executeOperationForAllExecutions(eg, Execution::completeCancelling);
    }

    private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer<Execution> operation) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            operation.accept(vertex.getCurrentExecutionAttempt());
        }
    }

    private SlotPoolImpl createSlotPoolImpl() {
        return new TestingSlotPoolImpl(TEST_JOB_ID);
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilderWithDefaultSlotAllocator(ExecutionGraphRestartTest.createJobGraph(), (SlotProvider)ExecutionGraphRestartTest.createSchedulerWithSlots((SlotPool)slotPool, taskManagerLocation)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor(this.taskRestartExecutor).build();
            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception"));
            Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
            scheduler.cancel();
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
            this.taskRestartExecutor.triggerScheduledTasks();
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
            for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
                Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            }
        }
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilderWithDefaultSlotAllocator(ExecutionGraphRestartTest.createJobGraph(), (SlotProvider)ExecutionGraphRestartTest.createSchedulerWithSlots((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
            ExecutionGraph graph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
            this.switchAllTasksToRunning(graph);
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
            scheduler.cancel();
            Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
            this.completeCanceling(graph);
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)graph.getState());
        }
    }

    @Test
    public void testFailWhileCanceling() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilderWithDefaultSlotAllocator(ExecutionGraphRestartTest.createJobGraph(), (SlotProvider)ExecutionGraphRestartTest.createSchedulerWithSlots((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
            ExecutionGraph graph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
            this.switchAllTasksToRunning(graph);
            scheduler.cancel();
            Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
            this.completeCanceling(graph);
            Assert.assertEquals((Object)JobStatus.FAILED, (Object)graph.getState());
        }
    }

    private void switchAllTasksToRunning(ExecutionGraph graph) {
        this.executeOperationForAllExecutions(graph, Execution::switchToRunning);
    }

    @Test
    public void testFailingExecutionAfterRestart() throws Exception {
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
        JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender, receiver});
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, (SlotProvider)ExecutionGraphRestartTest.createSchedulerWithSlots((SlotPool)slotPool, new LocalTaskManagerLocation(), 2)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor(this.taskRestartExecutor).build();
            ExecutionGraph eg = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            Iterator executionVertices = eg.getAllExecutionVertices().iterator();
            Execution finishedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
            Execution failedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
            finishedExecution.markFinished();
            failedExecution.fail((Throwable)new Exception("Test Exception"));
            failedExecution.completeCancelling();
            this.taskRestartExecutor.triggerScheduledTasks();
            Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                Assert.assertNotNull((String)"No assigned resource (test instability).", (Object)vertex.getCurrentAssignedResource());
                vertex.getCurrentExecutionAttempt().switchToRunning();
            }
            finishedExecution.fail((Throwable)new Exception("This should have no effect"));
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                vertex.getCurrentExecutionAttempt().markFinished();
            }
            Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)finishedExecution.getState());
            Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
        }
    }

    @Test
    public void testFailExecutionAfterCancel() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilderWithDefaultSlotAllocator(ExecutionGraphRestartTest.createJobGraphToCancel(), (SlotProvider)ExecutionGraphRestartTest.createSchedulerWithSlots((SlotPool)slotPool, new LocalTaskManagerLocation(), 2)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).setDelayExecutor(this.taskRestartExecutor).build();
            ExecutionGraph eg = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            scheduler.cancel();
            for (ExecutionVertex v : eg.getAllExecutionVertices()) {
                v.getCurrentExecutionAttempt().fail((Throwable)new Exception("Test Exception"));
            }
            Assert.assertEquals((Object)JobStatus.CANCELED, eg.getTerminationFuture().get());
            Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
            execution.completeCancelling();
            Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
        }
    }

    private static void startScheduling(SchedulerBase scheduler) throws Exception {
        scheduler.setMainThreadExecutor(mainThreadExecutor);
        Assert.assertThat((Object)scheduler.getExecutionGraph().getState(), (Matcher)Matchers.is((Object)JobStatus.CREATED));
        scheduler.startScheduling();
        Assert.assertThat((Object)scheduler.getExecutionGraph().getState(), (Matcher)Matchers.is((Object)JobStatus.RUNNING));
    }

    private static Scheduler createSchedulerWithSlots(SlotPool slotPool) throws Exception {
        return ExecutionGraphRestartTest.createSchedulerWithSlots(slotPool, new LocalTaskManagerLocation());
    }

    private static Scheduler createSchedulerWithSlots(SlotPool slotPool, TaskManagerLocation taskManagerLocation) throws Exception {
        return ExecutionGraphRestartTest.createSchedulerWithSlots(slotPool, taskManagerLocation, 31);
    }

    private static Scheduler createSchedulerWithSlots(SlotPool slotPool, TaskManagerLocation taskManagerLocation, int numSlots) throws Exception {
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        ExecutionGraphRestartTest.setupSlotPool(slotPool);
        SchedulerImpl scheduler = new SchedulerImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        scheduler.start(mainThreadExecutor);
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(31);
        for (int i = 0; i < numSlots; ++i) {
            AllocationID allocationId = new AllocationID();
            SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
            slotOffers.add(slotOffer);
        }
        slotPool.offerSlots(taskManagerLocation, (TaskManagerGateway)taskManagerGateway, slotOffers);
        return scheduler;
    }

    private static void setupSlotPool(SlotPool slotPool) throws Exception {
        String jobManagerAddress = "foobar";
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        slotPool.start(JobMasterId.generate(), "foobar", mainThreadExecutor);
        slotPool.connectToResourceManager((ResourceManagerGateway)resourceManagerGateway);
    }

    private static JobGraph createJobGraph() {
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", 31, NoOpInvokable.class);
        return new JobGraph("Pointwise job", new JobVertex[]{sender});
    }

    private static JobGraph createJobGraphToCancel() throws IOException {
        JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }
}

