package org.apache.flink.runtime.executiongraph;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.class */
public class ExecutionGraphPartitionReleaseTest extends TestLogger {
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private static final TestingComponentMainThreadExecutor mainThreadExecutor = new TestingComponentMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));

    @Test
    public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex createNoOpVertex3 = ExecutionGraphTestUtils.createNoOpVertex(1);
        createNoOpVertex2.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createNoOpVertex3.connectNewDataSetAsInput(createNoOpVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        TestingJobMasterPartitionTracker testingJobMasterPartitionTracker = new TestingJobMasterPartitionTracker();
        ArrayDeque arrayDeque = new ArrayDeque();
        testingJobMasterPartitionTracker.setStopTrackingAndReleasePartitionsConsumer(collection -> {
            arrayDeque.add(collection.iterator().next());
        });
        SchedulerBase createScheduler = createScheduler(testingJobMasterPartitionTracker, createNoOpVertex, createNoOpVertex2, createNoOpVertex3);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        mainThreadExecutor.execute(() -> {
            createScheduler.updateTaskExecutionState(new TaskExecutionState(getCurrentExecution(createNoOpVertex, executionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            Execution currentExecution = getCurrentExecution(createNoOpVertex, executionGraph);
            createScheduler.updateTaskExecutionState(new TaskExecutionState(getCurrentExecution(createNoOpVertex2, executionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.hasSize(1));
            MatcherAssert.assertThat(arrayDeque.remove(), IsEqual.equalTo(new ResultPartitionID((IntermediateResultPartitionID) currentExecution.getVertex().getProducedPartitions().keySet().iterator().next(), currentExecution.getAttemptId())));
        });
        mainThreadExecutor.execute(() -> {
            Execution currentExecution = getCurrentExecution(createNoOpVertex2, executionGraph);
            createScheduler.updateTaskExecutionState(new TaskExecutionState(getCurrentExecution(createNoOpVertex3, executionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.hasSize(1));
            MatcherAssert.assertThat(arrayDeque.remove(), IsEqual.equalTo(new ResultPartitionID((IntermediateResultPartitionID) currentExecution.getVertex().getProducedPartitions().keySet().iterator().next(), currentExecution.getAttemptId())));
        });
    }

    @Test
    public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("operator1", 1);
        JobVertex createNoOpVertex3 = ExecutionGraphTestUtils.createNoOpVertex("operator2", 1);
        JobVertex createNoOpVertex4 = ExecutionGraphTestUtils.createNoOpVertex("operator3", 1);
        createNoOpVertex2.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createNoOpVertex3.connectNewDataSetAsInput(createNoOpVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        createNoOpVertex4.connectNewDataSetAsInput(createNoOpVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        TestingJobMasterPartitionTracker testingJobMasterPartitionTracker = new TestingJobMasterPartitionTracker();
        ArrayDeque arrayDeque = new ArrayDeque();
        testingJobMasterPartitionTracker.setStopTrackingAndReleasePartitionsConsumer(collection -> {
            arrayDeque.add(collection.iterator().next());
        });
        SchedulerBase createScheduler = createScheduler(testingJobMasterPartitionTracker, createNoOpVertex, createNoOpVertex2, createNoOpVertex3, createNoOpVertex4);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        mainThreadExecutor.execute(() -> {
            createScheduler.updateTaskExecutionState(new TaskExecutionState(getCurrentExecution(createNoOpVertex, executionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            Execution currentExecution = getCurrentExecution(createNoOpVertex2, executionGraph);
            Iterator it = currentExecution.getVertex().getProducedPartitions().keySet().iterator();
            while (it.hasNext()) {
                createScheduler.notifyPartitionDataAvailable(new ResultPartitionID((IntermediateResultPartitionID) it.next(), currentExecution.getAttemptId()));
            }
            createScheduler.updateTaskExecutionState(new TaskExecutionState(currentExecution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            createScheduler.updateTaskExecutionState(new TaskExecutionState(getCurrentExecution(createNoOpVertex3, executionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            getCurrentExecution(createNoOpVertex3, executionGraph).getVertex().resetForNewExecution();
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
        mainThreadExecutor.execute(() -> {
            createScheduler.updateTaskExecutionState(new TaskExecutionState(getCurrentExecution(createNoOpVertex4, executionGraph).getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat(arrayDeque, Matchers.empty());
        });
    }

    private static Execution getCurrentExecution(JobVertex jobVertex, ExecutionGraph executionGraph) {
        return executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
    }

    private SchedulerBase createScheduler(JobMasterPartitionTracker jobMasterPartitionTracker, JobVertex... jobVertexArr) throws Exception {
        DefaultScheduler mo474build = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.batchJobGraph(jobVertexArr), mainThreadExecutor.getMainThreadExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory()).setPartitionTracker(jobMasterPartitionTracker).mo474build();
        TestingComponentMainThreadExecutor testingComponentMainThreadExecutor = mainThreadExecutor;
        mo474build.getClass();
        testingComponentMainThreadExecutor.execute(mo474build::startScheduling);
        return mo474build;
    }
}
