package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.class */
class ExecutionGraphRestartTest {
    private static final int NUM_TASKS = 31;

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;

    ExecutionGraphRestartTest() {
    }

    @BeforeEach
    void setUp() {
        this.taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    private void completeCanceling(ExecutionGraph executionGraph) {
        executeOperationForAllExecutions(executionGraph, (v0) -> {
            v0.completeCancelling();
        });
    }

    private void executeOperationForAllExecutions(ExecutionGraph executionGraph, Consumer<Execution> consumer) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            consumer.accept(((ExecutionVertex) it.next()).getCurrentExecutionAttempt());
        }
    }

    @Test
    void testCancelAllPendingRequestWhileCanceling() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = SlotPoolUtils.createDeclarativeSlotPoolBridge();
        Throwable th = null;
        try {
            DefaultScheduler build = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createJobVertex("Task", 81, NoOpInvokable.class)), mainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(createExecutionSlotAllocatorFactory(createDeclarativeSlotPoolBridge)).build();
            ExecutionGraph executionGraph = build.getExecutionGraph();
            startScheduling(build);
            offerSlots(createDeclarativeSlotPoolBridge, NUM_TASKS);
            Assertions.assertThat(createDeclarativeSlotPoolBridge.getNumPendingRequests()).isEqualTo(50);
            build.cancel();
            Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
            Assertions.assertThat(createDeclarativeSlotPoolBridge.getNumPendingRequests()).isZero();
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 == 0) {
                    createDeclarativeSlotPoolBridge.close();
                    return;
                }
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 != 0) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCancelAllPendingRequestWhileFailing() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = SlotPoolUtils.createDeclarativeSlotPoolBridge();
        Throwable th = null;
        try {
            DefaultScheduler build = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createJobVertex("Task", 81, NoOpInvokable.class)), mainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(createExecutionSlotAllocatorFactory(createDeclarativeSlotPoolBridge)).build();
            ExecutionGraph executionGraph = build.getExecutionGraph();
            startScheduling(build);
            offerSlots(createDeclarativeSlotPoolBridge, NUM_TASKS);
            Assertions.assertThat(createDeclarativeSlotPoolBridge.getNumPendingRequests()).isEqualTo(50);
            build.handleGlobalFailure(new Exception("test"));
            Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
            Assertions.assertThat(createDeclarativeSlotPoolBridge.getNumPendingRequests()).isZero();
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 == 0) {
                    createDeclarativeSlotPoolBridge.close();
                    return;
                }
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 != 0) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCancelWhileRestarting() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = SlotPoolUtils.createDeclarativeSlotPoolBridge();
        Throwable th = null;
        try {
            DefaultScheduler build = new DefaultSchedulerBuilder(createJobGraph(), mainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(createExecutionSlotAllocatorFactory(createDeclarativeSlotPoolBridge)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor(this.taskRestartExecutor).build();
            ExecutionGraph executionGraph = build.getExecutionGraph();
            startScheduling(build);
            createDeclarativeSlotPoolBridge.releaseTaskManager(offerSlots(createDeclarativeSlotPoolBridge, NUM_TASKS), new Exception("Test Exception"));
            Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
            build.cancel();
            Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
            this.taskRestartExecutor.triggerScheduledTasks();
            Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
            Iterator it = executionGraph.getAllExecutionVertices().iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((ExecutionVertex) it.next()).getExecutionState()).isEqualTo(ExecutionState.FAILED);
            }
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 == 0) {
                    createDeclarativeSlotPoolBridge.close();
                    return;
                }
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 != 0) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th3;
        }
    }

    private static ResourceID offerSlots(SlotPool slotPool, int i) {
        return SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.nCopies(i, ResourceProfile.ANY));
    }

    @Test
    void testCancelWhileFailing() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = SlotPoolUtils.createDeclarativeSlotPoolBridge();
        Throwable th = null;
        try {
            try {
                DefaultScheduler build = new DefaultSchedulerBuilder(createJobGraph(), mainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(createExecutionSlotAllocatorFactory(createDeclarativeSlotPoolBridge)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
                ExecutionGraph executionGraph = build.getExecutionGraph();
                startScheduling(build);
                offerSlots(createDeclarativeSlotPoolBridge, NUM_TASKS);
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RUNNING);
                switchAllTasksToRunning(executionGraph);
                build.handleGlobalFailure(new Exception("test"));
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
                build.cancel();
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
                completeCanceling(executionGraph);
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
                if (createDeclarativeSlotPoolBridge != null) {
                    if (0 == 0) {
                        createDeclarativeSlotPoolBridge.close();
                        return;
                    }
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (th != null) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testFailWhileCanceling() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = SlotPoolUtils.createDeclarativeSlotPoolBridge();
        Throwable th = null;
        try {
            try {
                DefaultScheduler build = new DefaultSchedulerBuilder(createJobGraph(), mainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(createExecutionSlotAllocatorFactory(createDeclarativeSlotPoolBridge)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).build();
                ExecutionGraph executionGraph = build.getExecutionGraph();
                startScheduling(build);
                offerSlots(createDeclarativeSlotPoolBridge, NUM_TASKS);
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RUNNING);
                switchAllTasksToRunning(executionGraph);
                build.cancel();
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
                build.handleGlobalFailure(new Exception("test"));
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
                completeCanceling(executionGraph);
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILED);
                if (createDeclarativeSlotPoolBridge != null) {
                    if (0 == 0) {
                        createDeclarativeSlotPoolBridge.close();
                        return;
                    }
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (th != null) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th4;
        }
    }

    private void switchAllTasksToRunning(ExecutionGraph executionGraph) {
        executeOperationForAllExecutions(executionGraph, (v0) -> {
            v0.switchToRunning();
        });
    }

    @Test
    void testFailingExecutionAfterRestart() throws Exception {
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class), ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class));
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = SlotPoolUtils.createDeclarativeSlotPoolBridge();
        Throwable th = null;
        try {
            try {
                DefaultScheduler build = new DefaultSchedulerBuilder(streamingJobGraph, mainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(createExecutionSlotAllocatorFactory(createDeclarativeSlotPoolBridge)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, Long.MAX_VALUE)).setDelayExecutor(this.taskRestartExecutor).build();
                ExecutionGraph executionGraph = build.getExecutionGraph();
                startScheduling(build);
                offerSlots(createDeclarativeSlotPoolBridge, 2);
                Iterator it = executionGraph.getAllExecutionVertices().iterator();
                Execution currentExecutionAttempt = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
                Execution currentExecutionAttempt2 = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
                currentExecutionAttempt.markFinished();
                currentExecutionAttempt2.fail(new Exception("Test Exception"));
                currentExecutionAttempt2.completeCancelling();
                this.taskRestartExecutor.triggerScheduledTasks();
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RUNNING);
                for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
                    Assertions.assertThat(executionVertex.getCurrentAssignedResource()).isNotNull();
                    executionVertex.getCurrentExecutionAttempt().switchToInitializing();
                    executionVertex.getCurrentExecutionAttempt().switchToRunning();
                }
                currentExecutionAttempt.fail(new Exception("This should have no effect"));
                Iterator it2 = executionGraph.getAllExecutionVertices().iterator();
                while (it2.hasNext()) {
                    ((ExecutionVertex) it2.next()).getCurrentExecutionAttempt().markFinished();
                }
                Assertions.assertThat(currentExecutionAttempt.getState()).isEqualTo(ExecutionState.FINISHED);
                Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FINISHED);
                if (createDeclarativeSlotPoolBridge != null) {
                    if (0 == 0) {
                        createDeclarativeSlotPoolBridge.close();
                        return;
                    }
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (th != null) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testFailExecutionAfterCancel() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = SlotPoolUtils.createDeclarativeSlotPoolBridge();
        Throwable th = null;
        try {
            DefaultScheduler build = new DefaultSchedulerBuilder(createJobGraphToCancel(), mainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(createExecutionSlotAllocatorFactory(createDeclarativeSlotPoolBridge)).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE)).setDelayExecutor(this.taskRestartExecutor).build();
            ExecutionGraph executionGraph = build.getExecutionGraph();
            startScheduling(build);
            offerSlots(createDeclarativeSlotPoolBridge, 1);
            build.cancel();
            Iterator it = executionGraph.getAllExecutionVertices().iterator();
            while (it.hasNext()) {
                ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
            }
            FlinkAssertions.assertThatFuture(executionGraph.getTerminationFuture()).eventuallySucceeds().isEqualTo(JobStatus.CANCELED);
            ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().completeCancelling();
            Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 == 0) {
                    createDeclarativeSlotPoolBridge.close();
                    return;
                }
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 != 0) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th3;
        }
    }

    private static void startScheduling(SchedulerBase schedulerBase) {
        Assertions.assertThat(schedulerBase.getExecutionGraph().getState()).isEqualTo(JobStatus.CREATED);
        schedulerBase.startScheduling();
        Assertions.assertThat(schedulerBase.getExecutionGraph().getState()).isEqualTo(JobStatus.RUNNING);
    }

    private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(SlotPool slotPool) throws Exception {
        setupSlotPool(slotPool);
        return SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool));
    }

    private static void setupSlotPool(SlotPool slotPool) throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        slotPool.start(JobMasterId.generate(), "foobar", mainThreadExecutor);
        slotPool.connectToResourceManager(testingResourceManagerGateway);
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class));
    }

    private static JobGraph createJobGraphToCancel() throws IOException {
        JobVertex createJobVertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2147483647L));
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(createJobVertex).setExecutionConfig(executionConfig).build();
    }
}
