package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.class */
class TaskExecutorPartitionTrackerImplTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest$TestingShuffleDescriptor.class */
    private static class TestingShuffleDescriptor implements ShuffleDescriptor {
        private final ResultPartitionID resultPartitionID;

        private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) {
            this.resultPartitionID = resultPartitionID;
        }

        public ResultPartitionID getResultPartitionID() {
            return this.resultPartitionID;
        }

        public Optional<ResourceID> storesLocalResourcesOn() {
            return Optional.empty();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest$TestingShuffleEnvironment.class */
    private static class TestingShuffleEnvironment implements ShuffleEnvironment<ResultPartition, SingleInputGate> {
        private final ShuffleEnvironment<ResultPartition, SingleInputGate> backingShuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        CompletableFuture<Collection<ResultPartitionID>> releasePartitionsLocallyFuture = null;

        private TestingShuffleEnvironment() {
        }

        public int start() throws IOException {
            return this.backingShuffleEnvironment.start();
        }

        public ShuffleIOOwnerContext createShuffleIOOwnerContext(String str, ExecutionAttemptID executionAttemptID, MetricGroup metricGroup) {
            return this.backingShuffleEnvironment.createShuffleIOOwnerContext(str, executionAttemptID, metricGroup);
        }

        public List<ResultPartition> createResultPartitionWriters(ShuffleIOOwnerContext shuffleIOOwnerContext, List<ResultPartitionDeploymentDescriptor> list) {
            return this.backingShuffleEnvironment.createResultPartitionWriters(shuffleIOOwnerContext, list);
        }

        public void releasePartitionsLocally(Collection<ResultPartitionID> collection) {
            this.backingShuffleEnvironment.releasePartitionsLocally(collection);
            if (this.releasePartitionsLocallyFuture != null) {
                this.releasePartitionsLocallyFuture.complete(collection);
            }
        }

        public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
            return this.backingShuffleEnvironment.getPartitionsOccupyingLocalResources();
        }

        public List<SingleInputGate> createInputGates(ShuffleIOOwnerContext shuffleIOOwnerContext, PartitionProducerStateProvider partitionProducerStateProvider, List<InputGateDeploymentDescriptor> list) {
            return this.backingShuffleEnvironment.createInputGates(shuffleIOOwnerContext, partitionProducerStateProvider, list);
        }

        public boolean updatePartitionInfo(ExecutionAttemptID executionAttemptID, PartitionInfo partitionInfo) throws IOException, InterruptedException {
            return this.backingShuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo);
        }

        public void close() throws Exception {
            this.backingShuffleEnvironment.close();
        }
    }

    TaskExecutorPartitionTrackerImplTest() {
    }

    @Test
    void createClusterPartitionReport() {
        TaskExecutorPartitionTrackerImpl taskExecutorPartitionTrackerImpl = new TaskExecutorPartitionTrackerImpl(new NettyShuffleEnvironmentBuilder().build());
        Assertions.assertThat(taskExecutorPartitionTrackerImpl.createClusterPartitionReport().getEntries()).isEmpty();
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        JobID jobID = new JobID();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        taskExecutorPartitionTrackerImpl.startTrackingPartition(jobID, new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID), intermediateDataSetID, 1));
        taskExecutorPartitionTrackerImpl.startTrackingPartition(jobID, new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID2), intermediateDataSetID, 2));
        taskExecutorPartitionTrackerImpl.promoteJobPartitions(Collections.singleton(resultPartitionID));
        ClusterPartitionReport.ClusterPartitionReportEntry clusterPartitionReportEntry = (ClusterPartitionReport.ClusterPartitionReportEntry) Iterables.getOnlyElement(taskExecutorPartitionTrackerImpl.createClusterPartitionReport().getEntries());
        Assertions.assertThat(clusterPartitionReportEntry.getDataSetId()).isEqualTo(intermediateDataSetID);
        Assertions.assertThat(clusterPartitionReportEntry.getNumTotalPartitions()).isEqualTo(1);
        Assertions.assertThat(clusterPartitionReportEntry.getHostedPartitions()).contains(new ResultPartitionID[]{resultPartitionID});
    }

    @Test
    void testStopTrackingAndReleaseJobPartitions() throws Exception {
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        CompletableFuture<Collection<ResultPartitionID>> completableFuture = new CompletableFuture<>();
        testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture;
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskExecutorPartitionTrackerImpl taskExecutorPartitionTrackerImpl = new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
        taskExecutorPartitionTrackerImpl.startTrackingPartition(new JobID(), new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.startTrackingPartition(new JobID(), new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID2), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.stopTrackingAndReleaseJobPartitions(Collections.singleton(resultPartitionID));
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().satisfies(new ThrowingConsumer[]{collection -> {
            Assertions.assertThat(collection).containsExactly(new ResultPartitionID[]{resultPartitionID});
        }});
    }

    @Test
    void testStopTrackingAndReleaseJobPartitionsFor() throws Exception {
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        CompletableFuture<Collection<ResultPartitionID>> completableFuture = new CompletableFuture<>();
        testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture;
        JobID jobID = new JobID();
        JobID jobID2 = new JobID();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskExecutorPartitionTrackerImpl taskExecutorPartitionTrackerImpl = new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
        taskExecutorPartitionTrackerImpl.startTrackingPartition(jobID, new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.startTrackingPartition(jobID2, new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID2), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.stopTrackingAndReleaseJobPartitionsFor(jobID);
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().asList().contains(new Object[]{resultPartitionID});
    }

    @Test
    void testGetTrackedPartitionsFor() {
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        JobID jobID = new JobID();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TaskExecutorPartitionTrackerImpl taskExecutorPartitionTrackerImpl = new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
        TaskExecutorPartitionInfo taskExecutorPartitionInfo = new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID), new IntermediateDataSetID(), 1);
        taskExecutorPartitionTrackerImpl.startTrackingPartition(jobID, taskExecutorPartitionInfo);
        Collection trackedPartitionsFor = taskExecutorPartitionTrackerImpl.getTrackedPartitionsFor(jobID);
        Assertions.assertThat(trackedPartitionsFor).hasSize(1);
        Assertions.assertThat((TaskExecutorPartitionInfo) trackedPartitionsFor.iterator().next()).isEqualTo(taskExecutorPartitionInfo);
    }

    @Test
    void promoteJobPartitions() throws Exception {
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        CompletableFuture<Collection<ResultPartitionID>> completableFuture = new CompletableFuture<>();
        testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture;
        JobID jobID = new JobID();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskExecutorPartitionTrackerImpl taskExecutorPartitionTrackerImpl = new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
        taskExecutorPartitionTrackerImpl.startTrackingPartition(jobID, new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.startTrackingPartition(jobID, new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID2), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.promoteJobPartitions(Collections.singleton(resultPartitionID));
        taskExecutorPartitionTrackerImpl.stopTrackingAndReleaseJobPartitionsFor(jobID);
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().asList().doesNotContain(new Object[]{resultPartitionID});
    }

    @Test
    void stopTrackingAndReleaseAllClusterPartitions() throws Exception {
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        CompletableFuture<Collection<ResultPartitionID>> completableFuture = new CompletableFuture<>();
        testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture;
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskExecutorPartitionTrackerImpl taskExecutorPartitionTrackerImpl = new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
        taskExecutorPartitionTrackerImpl.startTrackingPartition(new JobID(), new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.startTrackingPartition(new JobID(), new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID2), new IntermediateDataSetID(), 1));
        taskExecutorPartitionTrackerImpl.promoteJobPartitions(Collections.singleton(resultPartitionID));
        taskExecutorPartitionTrackerImpl.stopTrackingAndReleaseAllClusterPartitions();
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().satisfies(new ThrowingConsumer[]{collection -> {
            Assertions.assertThat(collection).contains(new ResultPartitionID[]{resultPartitionID});
        }});
    }

    @Test
    void stopTrackingAndReleaseClusterPartitions() throws Exception {
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        CompletableFuture<Collection<ResultPartitionID>> completableFuture = new CompletableFuture<>();
        testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture;
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        IntermediateDataSetID intermediateDataSetID2 = new IntermediateDataSetID();
        TaskExecutorPartitionTrackerImpl taskExecutorPartitionTrackerImpl = new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
        taskExecutorPartitionTrackerImpl.startTrackingPartition(new JobID(), new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID), intermediateDataSetID, 1));
        taskExecutorPartitionTrackerImpl.startTrackingPartition(new JobID(), new TaskExecutorPartitionInfo(new TestingShuffleDescriptor(resultPartitionID2), intermediateDataSetID2, 1));
        taskExecutorPartitionTrackerImpl.promoteJobPartitions(Collections.singleton(resultPartitionID));
        taskExecutorPartitionTrackerImpl.stopTrackingAndReleaseClusterPartitions(Collections.singleton(intermediateDataSetID));
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().satisfies(new ThrowingConsumer[]{collection -> {
            Assertions.assertThat(collection).contains(new ResultPartitionID[]{resultPartitionID});
        }});
    }
}
