/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionGraphRestartTest {
    private static final int NUM_TASKS = 31;
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;

    ExecutionGraphRestartTest() {
    }

    @BeforeEach
    void setUp() {
        this.taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    private void completeCanceling(ExecutionGraph eg) {
        this.executeOperationForAllExecutions(eg, Execution::completeCancelling);
    }

    private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer<Execution> operation) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            operation.accept(vertex.getCurrentExecutionAttempt());
        }
    }

    @Test
    void testCancelAllPendingRequestWhileCanceling() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            int numTasksExceedSlotPool = 50;
            JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", 81, NoOpInvokable.class);
            JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(graph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).build();
            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assertions.assertThat((int)slotPool.getNumPendingRequests()).isEqualTo(50);
            scheduler.cancel();
            Assertions.assertThat((Comparable)executionGraph.getState()).isEqualTo((Object)JobStatus.CANCELLING);
            Assertions.assertThat((int)slotPool.getNumPendingRequests()).isZero();
        }
    }

    @Test
    void testCancelAllPendingRequestWhileFailing() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            int numTasksExceedSlotPool = 50;
            JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", 81, NoOpInvokable.class);
            JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(graph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).build();
            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assertions.assertThat((int)slotPool.getNumPendingRequests()).isEqualTo(50);
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assertions.assertThat((Comparable)executionGraph.getState()).isEqualTo((Object)JobStatus.FAILING);
            Assertions.assertThat((int)slotPool.getNumPendingRequests()).isZero();
        }
    }

    @Test
    void testCancelWhileRestarting() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraph(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).build();
            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ResourceID taskManagerResourceId = ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            slotPool.releaseTaskManager(taskManagerResourceId, new Exception("Test Exception"));
            Assertions.assertThat((Comparable)executionGraph.getState()).isEqualTo((Object)JobStatus.RESTARTING);
            scheduler.cancel();
            Assertions.assertThat((Comparable)executionGraph.getState()).isEqualTo((Object)JobStatus.CANCELED);
            this.taskRestartExecutor.triggerScheduledTasks();
            Assertions.assertThat((Comparable)executionGraph.getState()).isEqualTo((Object)JobStatus.CANCELED);
            for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
                Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
            }
        }
    }

    private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
        return SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.nCopies(numSlots, ResourceProfile.ANY));
    }

    @Test
    void testCancelWhileFailing() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraph(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
            ExecutionGraph graph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.RUNNING);
            this.switchAllTasksToRunning(graph);
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.FAILING);
            scheduler.cancel();
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.CANCELLING);
            this.completeCanceling(graph);
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.CANCELED);
        }
    }

    @Test
    void testFailWhileCanceling() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraph(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
            ExecutionGraph graph = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 31);
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.RUNNING);
            this.switchAllTasksToRunning(graph);
            scheduler.cancel();
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.CANCELLING);
            scheduler.handleGlobalFailure((Throwable)new Exception("test"));
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.FAILING);
            this.completeCanceling(graph);
            Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.FAILED);
        }
    }

    private void switchAllTasksToRunning(ExecutionGraph graph) {
        this.executeOperationForAllExecutions(graph, Execution::switchToRunning);
    }

    @Test
    void testFailingExecutionAfterRestart() throws Exception {
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
        JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).build();
            ExecutionGraph eg = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 2);
            Iterator executionVertices = eg.getAllExecutionVertices().iterator();
            Execution finishedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
            Execution failedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
            finishedExecution.markFinished();
            failedExecution.fail((Throwable)new Exception("Test Exception"));
            failedExecution.completeCancelling();
            this.taskRestartExecutor.triggerScheduledTasks();
            Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RUNNING);
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                Assertions.assertThat((Object)vertex.getCurrentAssignedResource()).isNotNull();
                vertex.getCurrentExecutionAttempt().switchToInitializing();
                vertex.getCurrentExecutionAttempt().switchToRunning();
            }
            finishedExecution.fail((Throwable)new Exception("This should have no effect"));
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                vertex.getCurrentExecutionAttempt().markFinished();
            }
            Assertions.assertThat((Comparable)finishedExecution.getState()).isEqualTo((Object)ExecutionState.FINISHED);
            Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FINISHED);
        }
    }

    @Test
    void testFailExecutionAfterCancel() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge();){
            DefaultScheduler scheduler = new DefaultSchedulerBuilder(ExecutionGraphRestartTest.createJobGraphToCancel(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(ExecutionGraphRestartTest.createExecutionSlotAllocatorFactory((SlotPool)slotPool)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).build();
            ExecutionGraph eg = scheduler.getExecutionGraph();
            ExecutionGraphRestartTest.startScheduling((SchedulerBase)scheduler);
            ExecutionGraphRestartTest.offerSlots((SlotPool)slotPool, 1);
            scheduler.cancel();
            for (ExecutionVertex v : eg.getAllExecutionVertices()) {
                v.getCurrentExecutionAttempt().fail((Throwable)new Exception("Test Exception"));
            }
            FlinkAssertions.assertThatFuture((CompletableFuture)eg.getTerminationFuture()).eventuallySucceeds().isEqualTo((Object)JobStatus.CANCELED);
            Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
            execution.completeCancelling();
            Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.CANCELED);
        }
    }

    private static void startScheduling(SchedulerBase scheduler) {
        Assertions.assertThat((Comparable)scheduler.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.CREATED);
        scheduler.startScheduling();
        Assertions.assertThat((Comparable)scheduler.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.RUNNING);
    }

    private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(SlotPool slotPool) throws Exception {
        ExecutionGraphRestartTest.setupSlotPool(slotPool);
        PhysicalSlotProviderImpl physicalSlotProvider = new PhysicalSlotProviderImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        return SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory((PhysicalSlotProvider)physicalSlotProvider);
    }

    private static void setupSlotPool(SlotPool slotPool) throws Exception {
        String jobManagerAddress = "foobar";
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        slotPool.start(JobMasterId.generate(), "foobar");
        slotPool.connectToResourceManager((ResourceManagerGateway)resourceManagerGateway);
    }

    private static JobGraph createJobGraph() {
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", 31, NoOpInvokable.class);
        return JobGraphTestUtils.streamingJobGraph(sender);
    }

    private static JobGraph createJobGraphToCancel() throws IOException {
        JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(vertex).build();
        RestartStrategyUtils.configureFixedDelayRestartStrategy(jobGraph, Integer.MAX_VALUE, Integer.MAX_VALUE);
        return jobGraph;
    }
}

