package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionTest.class */
public class ExecutionTest extends TestLogger {

    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource();
    private final TestingComponentMainThreadExecutor testMainThreadUtil = EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionTest$TestingShuffleMaster.class */
    private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return new CompletableFuture<>();
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        }
    }

    @Test
    public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        TestingPhysicalSlotProvider createWithLimitedAmountOfPhysicalSlots = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler build = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(createNoOpJobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(createWithLimitedAmountOfPhysicalSlots)).build();
        ExecutionVertex executionVertex = build.getExecutionJobVertex(id).getTaskVertices()[0];
        build.startScheduling();
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        CompletableFuture<TestingPhysicalSlot> firstResponseOrFail = createWithLimitedAmountOfPhysicalSlots.getFirstResponseOrFail();
        CompletableFuture cancel = executionVertex.cancel();
        currentExecutionAttempt.completeCancelling();
        cancel.thenApply(obj -> {
            Assert.assertTrue(firstResponseOrFail.isDone());
            return true;
        }).get();
    }

    @Test
    public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        DefaultScheduler build = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(createNoOpJobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1))).build();
        Execution currentExecutionAttempt = build.getExecutionJobVertex(id).getTaskVertices()[0].getCurrentExecutionAttempt();
        currentExecutionAttempt.setInitialState(new JobManagerTaskRestore(1L, new TaskStateSnapshot()));
        Assert.assertThat(currentExecutionAttempt.getTaskRestore(), Matchers.is(Matchers.notNullValue()));
        build.startScheduling();
        Assert.assertThat(currentExecutionAttempt.getTaskRestore(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void testCanceledExecutionReturnsSlot() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        JobVertexID id = createNoOpJobVertex.getID();
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        TestingPhysicalSlotProvider create = TestingPhysicalSlotProvider.create(resourceProfile -> {
            return CompletableFuture.completedFuture(TestingPhysicalSlot.builder().withTaskManagerGateway(simpleAckingTaskManagerGateway).build());
        });
        DefaultScheduler build = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(createNoOpJobVertex), this.testMainThreadUtil.getMainThreadExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(create)).build();
        Execution currentExecutionAttempt = build.getExecutionJobVertex(id).getTaskVertices()[0].getCurrentExecutionAttempt();
        simpleAckingTaskManagerGateway.setCancelConsumer(executionAttemptID -> {
            if (currentExecutionAttempt.getAttemptId().equals(executionAttemptID)) {
                currentExecutionAttempt.completeCancelling();
            }
        });
        TestingComponentMainThreadExecutor testingComponentMainThreadExecutor = this.testMainThreadUtil;
        build.getClass();
        testingComponentMainThreadExecutor.execute(build::startScheduling);
        TestingComponentMainThreadExecutor testingComponentMainThreadExecutor2 = this.testMainThreadUtil;
        currentExecutionAttempt.getClass();
        testingComponentMainThreadExecutor2.execute(currentExecutionAttempt::cancel);
        Assert.assertThat(create.getRequests().keySet(), Matchers.is(create.getCancellations().keySet()));
    }

    @Test
    public void testSlotReleaseAtomicallyReleasesExecution() throws Exception {
        JobVertex createNoOpJobVertex = createNoOpJobVertex();
        TestingPhysicalSlotProvider createWithLimitedAmountOfPhysicalSlots = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler build = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(createNoOpJobVertex), this.testMainThreadUtil.getMainThreadExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(createWithLimitedAmountOfPhysicalSlots)).build();
        Execution currentExecutionAttempt = build.getExecutionJobVertex(createNoOpJobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        TestingComponentMainThreadExecutor testingComponentMainThreadExecutor = this.testMainThreadUtil;
        build.getClass();
        testingComponentMainThreadExecutor.execute(build::startScheduling);
        createWithLimitedAmountOfPhysicalSlots.awaitAllSlotRequests();
        TestingPhysicalSlot testingPhysicalSlot = createWithLimitedAmountOfPhysicalSlots.getFirstResponseOrFail().get();
        this.testMainThreadUtil.execute(() -> {
            Assert.assertThat(currentExecutionAttempt.getAssignedAllocationID(), Matchers.is(testingPhysicalSlot.getAllocationId()));
            testingPhysicalSlot.releasePayload(new FlinkException("Test exception"));
            Assert.assertThat(Boolean.valueOf(currentExecutionAttempt.getReleaseFuture().isDone()), Matchers.is(true));
        });
    }

    @Test
    public void testIncompletePartitionRegistrationFutureIsRejected() throws Exception {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("target");
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        boolean z = false;
        try {
            Execution.registerProducedPartitions(((ExecutionJobVertex) TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2)).setShuffleMaster(testingShuffleMaster).build().getAllVertices().get(jobVertex.getID())).getTaskVertices()[0], new LocalTaskManagerLocation(), new ExecutionAttemptID(), false);
        } catch (IllegalStateException e) {
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }
}
