package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.class */
class BlockingResultPartitionReleaseTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ScheduledExecutorService scheduledExecutorService;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private ManuallyTriggeredScheduledExecutorService ioExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest$TestingPartitionTracker.class */
    public static class TestingPartitionTracker extends NoOpJobMasterPartitionTracker {
        private final List<ResultPartitionID> releasedPartitions;

        private TestingPartitionTracker() {
            this.releasedPartitions = new ArrayList();
        }

        @Override // org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker
        public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> collection, boolean z) {
            this.releasedPartitions.addAll((Collection) Preconditions.checkNotNull(collection));
        }
    }

    BlockingResultPartitionReleaseTest() {
    }

    @BeforeEach
    void setup() {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
    }

    @AfterEach
    void teardown() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    @Test
    void testMultipleConsumersForAdaptiveBatchScheduler() throws Exception {
        testResultPartitionConsumedByMultiConsumers(true);
    }

    @Test
    void testMultipleConsumersForDefaultScheduler() throws Exception {
        testResultPartitionConsumedByMultiConsumers(false);
    }

    private void testResultPartitionConsumedByMultiConsumers(boolean z) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("producer", 2);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("consumer1", 2);
        JobVertex createNoOpVertex3 = ExecutionGraphTestUtils.createNoOpVertex("consumer2", 2);
        TestingPartitionTracker testingPartitionTracker = new TestingPartitionTracker();
        ExecutionGraph executionGraph = SchedulerTestingUtils.createSchedulerAndDeploy(z, jobID, createNoOpVertex, new JobVertex[]{createNoOpVertex2, createNoOpVertex3}, DistributionPattern.ALL_TO_ALL, new TestingBlobWriter(Integer.MAX_VALUE), this.mainThreadExecutor, this.ioExecutor, testingPartitionTracker, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getExecutionGraph();
        Assertions.assertThat(testingPartitionTracker.releasedPartitions).isEmpty();
        CompletableFuture.runAsync(() -> {
            ExecutionGraphTestUtils.finishJobVertex(executionGraph, createNoOpVertex2.getID());
        }, this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assertions.assertThat(testingPartitionTracker.releasedPartitions).isEmpty();
        CompletableFuture.runAsync(() -> {
            ExecutionGraphTestUtils.finishJobVertex(executionGraph, createNoOpVertex3.getID());
        }, this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assertions.assertThat(testingPartitionTracker.releasedPartitions.size()).isEqualTo(2);
        for (int i = 0; i < 2; i++) {
            Assertions.assertThat(testingPartitionTracker.releasedPartitions.stream().map((v0) -> {
                return v0.getPartitionId();
            })).containsExactlyInAnyOrder(Arrays.stream(((ExecutionJobVertex) Preconditions.checkNotNull(executionGraph.getJobVertex(createNoOpVertex.getID()))).getProducedDataSets()[0].getPartitions()).map((v0) -> {
                return v0.getPartitionId();
            }).toArray(i2 -> {
                return new IntermediateResultPartitionID[i2];
            }));
        }
    }
}
