package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexTest.class */
class ExecutionVertexTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionVertexTest() {
    }

    @Test
    void testResetForNewExecutionReleasesPartitions() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(createNoOpVertex2, createNoOpVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        CompletableFuture completableFuture = new CompletableFuture();
        TestingJobMasterPartitionTracker testingJobMasterPartitionTracker = new TestingJobMasterPartitionTracker();
        Objects.requireNonNull(completableFuture);
        testingJobMasterPartitionTracker.setStopTrackingAndReleasePartitionsConsumer((v1) -> {
            r1.complete(v1);
        });
        DefaultScheduler build = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(createNoOpVertex, createNoOpVertex2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setPartitionTracker(testingJobMasterPartitionTracker).build();
        build.startScheduling();
        ExecutionJobVertex executionJobVertex = build.getExecutionJobVertex(createNoOpVertex.getID());
        Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        Assertions.assertThat(completableFuture).isNotDone();
        currentExecutionAttempt.markFinished();
        Assertions.assertThat(completableFuture).isNotDone();
        for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
            executionVertex.resetForNewExecution();
        }
        Assertions.assertThat((Collection) completableFuture.get()).contains(new ResultPartitionID[]{((ResultPartitionDeploymentDescriptor) currentExecutionAttempt.getResultPartitionDeploymentDescriptor(executionJobVertex.getProducedDataSets()[0].getPartitions()[0].getPartitionId()).get()).getShuffleDescriptor().getResultPartitionID()});
    }

    @Test
    void testFindLatestAllocationIgnoresFailedAttempts() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(createNoOpVertex);
        TestingPhysicalSlotProvider createWithLimitedAmountOfPhysicalSlots = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1);
        DefaultScheduler build = new DefaultSchedulerBuilder(streamingJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(createWithLimitedAmountOfPhysicalSlots)).build();
        build.startScheduling();
        ExecutionVertex executionVertex = build.getExecutionJobVertex(createNoOpVertex.getID()).getTaskVertices()[0];
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        TestingPhysicalSlot join = createWithLimitedAmountOfPhysicalSlots.getFirstResponseOrFail().join();
        AllocationID allocationId = join.getAllocationId();
        TaskManagerLocation taskManagerLocation = join.getTaskManagerLocation();
        cancelExecution(currentExecutionAttempt);
        executionVertex.resetForNewExecution();
        Assertions.assertThat(executionVertex.findLastAllocation()).hasValue(allocationId);
        Assertions.assertThat(executionVertex.findLastLocation()).hasValue(taskManagerLocation);
        cancelExecution(executionVertex.getCurrentExecutionAttempt());
        executionVertex.resetForNewExecution();
        Assertions.assertThat(executionVertex.findLastAllocation()).hasValue(allocationId);
        Assertions.assertThat(executionVertex.findLastLocation()).hasValue(taskManagerLocation);
    }

    private void cancelExecution(Execution execution) {
        execution.cancel();
        execution.completeCancelling();
    }
}
