package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.class */
class ExecutionVertexCancelTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest$CancelSequenceSimpleAckingTaskManagerGateway.class */
    private static class CancelSequenceSimpleAckingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
        private final int successfulOperations;
        private int index = -1;

        public CancelSequenceSimpleAckingTaskManagerGateway(int i) {
            this.successfulOperations = i;
        }

        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time time) {
            this.index++;
            return this.index >= this.successfulOperations ? FutureUtils.completedExceptionally(new IOException("Rpc call fails")) : CompletableFuture.completedFuture(Acknowledge.get());
        }
    }

    ExecutionVertexCancelTest() {
    }

    @Test
    void testCancelFromCreated() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
            executionVertex.cancel();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
            Assertions.assertThat(executionVertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testCancelFromScheduled() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.SCHEDULED);
            executionVertex.cancel();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
            Assertions.assertThat(executionVertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testCancelFromRunning() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(executionVertex, createTestingLogicalSlot);
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
            executionVertex.cancel();
            executionVertex.getCurrentExecutionAttempt().completeCancelling();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
            Assertions.assertThat(createTestingLogicalSlot.isAlive()).isFalse();
            Assertions.assertThat(executionVertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testRepeatedCancelFromRunning() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(executionVertex, createTestingLogicalSlot);
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
            executionVertex.cancel();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELING);
            executionVertex.cancel();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELING);
            executionVertex.getCurrentExecutionAttempt().completeCancelling();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
            Assertions.assertThat(createTestingLogicalSlot.isAlive()).isFalse();
            Assertions.assertThat(executionVertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testCancelFromRunningDidNotFindTask() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            ExecutionGraphTestUtils.setVertexResource(executionVertex, new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
            executionVertex.cancel();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELING);
            Assertions.assertThat(executionVertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testCancelCallFails() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(0)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(executionVertex, createTestingLogicalSlot);
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
            executionVertex.cancel();
            Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
            Assertions.assertThat(createTestingLogicalSlot.isAlive()).isFalse();
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat(executionVertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testSendCancelAndReceiveFail() throws Exception {
        DefaultScheduler createScheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createNoOpVertex(10)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.RUNNING);
        ExecutionVertex[] taskVertices = ((ExecutionJobVertex) executionGraph.getVerticesTopologically().iterator().next()).getTaskVertices();
        Assertions.assertThat(executionGraph.getRegisteredExecutions()).hasSize(taskVertices.length);
        Execution currentExecutionAttempt = taskVertices[3].getCurrentExecutionAttempt();
        currentExecutionAttempt.cancel();
        Assertions.assertThat(currentExecutionAttempt.getState()).isEqualTo(ExecutionState.CANCELING);
        currentExecutionAttempt.markFailed(new Exception("test"));
        Assertions.assertThat(currentExecutionAttempt.getState() == ExecutionState.FAILED || currentExecutionAttempt.getState() == ExecutionState.CANCELED).isTrue();
        Assertions.assertThat(currentExecutionAttempt.getAssignedResource().isAlive()).isFalse();
        Assertions.assertThat(executionGraph.getRegisteredExecutions()).hasSize(taskVertices.length - 1);
    }
}
