/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.ProgrammedSlotProvider;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.executiongraph.TestingSlotProviderStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class ExecutionVertexCancelTest
extends TestLogger {
    @Test
    public void testCancelFromCreated() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelFromScheduled() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.SCHEDULED);
            Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelFromRunning() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, new DirectScheduledExecutorService());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            vertex.getCurrentExecutionAttempt().completeCancelling();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertFalse((boolean)slot.isAlive());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRepeatedCancelFromRunning() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, new DirectScheduledExecutorService());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            vertex.getCurrentExecutionAttempt().completeCancelling();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertFalse((boolean)slot.isAlive());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelFromRunningDidNotFindTask() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, new DirectScheduledExecutorService());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelCallFails() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, new DirectScheduledExecutorService());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(0)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertFalse((boolean)slot.isAlive());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSendCancelAndReceiveFail() throws Exception {
        ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph();
        graph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(graph);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        ExecutionVertex[] vertices = ((ExecutionJobVertex)graph.getVerticesTopologically().iterator().next()).getTaskVertices();
        Assert.assertEquals((long)vertices.length, (long)graph.getRegisteredExecutions().size());
        Execution exec = vertices[3].getCurrentExecutionAttempt();
        exec.cancel();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)exec.getState());
        exec.markFailed((Throwable)new Exception("test"));
        Assert.assertTrue((exec.getState() == ExecutionState.FAILED || exec.getState() == ExecutionState.CANCELED ? 1 : 0) != 0);
        Assert.assertFalse((boolean)exec.getAssignedResource().isAlive());
        Assert.assertEquals((long)(vertices.length - 1), (long)exec.getVertex().getExecutionGraph().getRegisteredExecutions().size());
    }

    @Test
    public void testScheduleOrDeployAfterCancel() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELED);
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            vertex.scheduleForExecution((SlotProviderStrategy)TestingSlotProviderStrategy.from((SlotProvider)new ProgrammedSlotProvider(1)), LocationPreferenceConstraint.ALL, Collections.emptySet());
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            try {
                TestingLogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
                vertex.deployToSlot((LogicalSlot)slot);
                Assert.fail((String)"Method should throw an exception");
            }
            catch (IllegalStateException e) {
                Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testActionsWhileCancelling() {
        try {
            TestingLogicalSlot slot;
            ExecutionVertex vertex;
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid);
            try {
                vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
                ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELING);
                vertex.scheduleForExecution((SlotProviderStrategy)TestingSlotProviderStrategy.from((SlotProvider)new ProgrammedSlotProvider(1)), LocationPreferenceConstraint.ALL, Collections.emptySet());
            }
            catch (Exception e) {
                Assert.fail((String)"should not throw an exception");
            }
            try {
                vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
                ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELING);
                slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
                vertex.deployToSlot((LogicalSlot)slot);
                Assert.fail((String)"Method should throw an exception");
            }
            catch (IllegalStateException vertex2) {
                // empty catch block
            }
            vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELING);
            Exception failureCause = new Exception("test exception");
            vertex.fail((Throwable)failureCause);
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertFalse((boolean)slot.isAlive());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static class CancelSequenceSimpleAckingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        private final int successfulOperations;
        private int index = -1;

        public CancelSequenceSimpleAckingTaskManagerGateway(int successfulOperations) {
            this.successfulOperations = successfulOperations;
        }

        @Override
        public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
            ++this.index;
            if (this.index >= this.successfulOperations) {
                return FutureUtils.completedExceptionally((Throwable)new IOException("Rpc call fails"));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
    }
}

