package org.apache.flink.runtime.io.network.partition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.class */
public class JobMasterPartitionTrackerImplTest extends TestLogger {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest$ReleaseCall.class */
    public static class ReleaseCall {
        private final ResourceID taskExecutorId;
        private final JobID jobId;
        private final Collection<ResultPartitionID> releasedPartitions;
        private final Collection<ResultPartitionID> promotedPartitions;

        private ReleaseCall(ResourceID resourceID, JobID jobID, Collection<ResultPartitionID> collection, Collection<ResultPartitionID> collection2) {
            this.taskExecutorId = resourceID;
            this.jobId = jobID;
            this.releasedPartitions = collection;
            this.promotedPartitions = collection2;
        }

        public ResourceID getTaskExecutorId() {
            return this.taskExecutorId;
        }

        public JobID getJobId() {
            return this.jobId;
        }

        public Collection<ResultPartitionID> getReleasedPartitions() {
            return this.releasedPartitions;
        }

        public Collection<ResultPartitionID> getPromotedPartitions() {
            return this.promotedPartitions;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest$TestingResourceManagerGateway.class */
    private static class TestingResourceManagerGateway extends org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway {
        private final List<IntermediateDataSetID> requestedIntermediateDataSetIds;

        private TestingResourceManagerGateway() {
            this.requestedIntermediateDataSetIds = new ArrayList();
        }

        @Override // org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway
        public CompletableFuture<List<ShuffleDescriptor>> getClusterPartitionsShuffleDescriptors(IntermediateDataSetID intermediateDataSetID) {
            this.requestedIntermediateDataSetIds.add(intermediateDataSetID);
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest$TestingShuffleMaster.class */
    public static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ResultPartitionID> externallyReleasedPartitions;

        private TestingShuffleMaster() {
            this.externallyReleasedPartitions = new ArrayBlockingQueue(4);
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor.getResultPartitionID());
        }
    }

    @Test
    public void testPipelinedPartitionIsNotTracked() {
        testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testBlockingPartitionIsTracked() {
        testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testPipelinedApproximatePartitionIsTracked() {
        testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED_APPROXIMATE);
    }

    private static void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
        JobMasterPartitionTrackerImpl jobMasterPartitionTrackerImpl = new JobMasterPartitionTrackerImpl(new JobID(), new TestingShuffleMaster(), resourceID -> {
            return Optional.empty();
        });
        ResourceID generate = ResourceID.generate();
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(new ResultPartitionID(), resultPartitionType, true));
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isTrackingPartitionsFor(generate)), Matchers.is(Boolean.valueOf(resultPartitionType.isReleaseByScheduler())));
    }

    @Test
    public void testReleaseCallsWithLocalResources() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        JobID jobID = new JobID();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl jobMasterPartitionTrackerImpl = new JobMasterPartitionTrackerImpl(jobID, testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID, true));
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isTrackingPartitionsFor(generate)), Matchers.is(true));
        jobMasterPartitionTrackerImpl.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionID));
        Assert.assertEquals(1L, arrayBlockingQueue.size());
        ReleaseCall releaseCall = (ReleaseCall) arrayBlockingQueue.remove();
        Assert.assertEquals(generate, releaseCall.getTaskExecutorId());
        Assert.assertEquals(jobID, releaseCall.getJobId());
        MatcherAssert.assertThat(releaseCall.getReleasedPartitions(), Matchers.contains(new ResultPartitionID[]{resultPartitionID}));
        MatcherAssert.assertThat(releaseCall.getPromotedPartitions(), Matchers.is(Matchers.empty()));
        Assert.assertEquals(1L, testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals(resultPartitionID, testingShuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isTrackingPartitionsFor(generate)), Matchers.is(false));
    }

    @Test
    public void testReleaseCallsWithoutLocalResources() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl jobMasterPartitionTrackerImpl = new JobMasterPartitionTrackerImpl(new JobID(), testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID, false));
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isTrackingPartitionsFor(generate)), Matchers.is(false));
        jobMasterPartitionTrackerImpl.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionID));
        Assert.assertEquals(0L, arrayBlockingQueue.size());
        Assert.assertEquals(1L, testingShuffleMaster.externallyReleasedPartitions.size());
        MatcherAssert.assertThat(testingShuffleMaster.externallyReleasedPartitions, Matchers.contains(new ResultPartitionID[]{resultPartitionID}));
    }

    @Test
    public void testStopTrackingIssuesNoReleaseCalls() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl jobMasterPartitionTrackerImpl = new JobMasterPartitionTrackerImpl(new JobID(), testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(new ResultPartitionID(), true));
        jobMasterPartitionTrackerImpl.stopTrackingPartitionsFor(generate);
        Assert.assertEquals(0L, arrayBlockingQueue.size());
        Assert.assertEquals(0L, testingShuffleMaster.externallyReleasedPartitions.size());
    }

    @Test
    public void testTrackingInternalAndExternalPartitionsByTmId() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl jobMasterPartitionTrackerImpl = new JobMasterPartitionTrackerImpl(new JobID(), testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID2, false));
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isTrackingPartitionsFor(generate)), Matchers.is(false));
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID, true));
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isTrackingPartitionsFor(generate)), Matchers.is(true));
        MatcherAssert.assertThat(jobMasterPartitionTrackerImpl.getAllTrackedPartitions().stream().map(resultPartitionDeploymentDescriptor -> {
            return resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(new ResultPartitionID[]{resultPartitionID, resultPartitionID2}));
        jobMasterPartitionTrackerImpl.stopTrackingPartitionsFor(generate);
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isTrackingPartitionsFor(generate)), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isPartitionTracked(resultPartitionID)), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(jobMasterPartitionTrackerImpl.isPartitionTracked(resultPartitionID2)), Matchers.is(true));
        MatcherAssert.assertThat(((ResultPartitionDeploymentDescriptor) Iterables.getOnlyElement(jobMasterPartitionTrackerImpl.getAllTrackedPartitions())).getShuffleDescriptor().getResultPartitionID(), Matchers.is(resultPartitionID2));
    }

    @Test
    public void testReleaseOrPromote() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl jobMasterPartitionTrackerImpl = new JobMasterPartitionTrackerImpl(new JobID(), testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        ResultPartitionID resultPartitionID3 = new ResultPartitionID();
        ResultPartitionID resultPartitionID4 = new ResultPartitionID();
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID, ResultPartitionType.BLOCKING, true));
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID2, ResultPartitionType.BLOCKING, false));
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID3, ResultPartitionType.BLOCKING_PERSISTENT, true));
        jobMasterPartitionTrackerImpl.startTrackingPartition(generate, AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID4, ResultPartitionType.BLOCKING_PERSISTENT, false));
        jobMasterPartitionTrackerImpl.stopTrackingAndReleaseOrPromotePartitions(Arrays.asList(resultPartitionID, resultPartitionID2, resultPartitionID3, resultPartitionID4));
        Assert.assertEquals(1L, arrayBlockingQueue.size());
        ReleaseCall releaseCall = (ReleaseCall) arrayBlockingQueue.remove();
        Assert.assertEquals(resultPartitionID, Iterables.getOnlyElement(releaseCall.getReleasedPartitions()));
        Assert.assertEquals(resultPartitionID3, Iterables.getOnlyElement(releaseCall.getPromotedPartitions()));
        MatcherAssert.assertThat(new ArrayList(testingShuffleMaster.externallyReleasedPartitions), Matchers.containsInAnyOrder(new ResultPartitionID[]{resultPartitionID, resultPartitionID2}));
    }

    @Test
    public void testGetShuffleDescriptors() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        JobMasterPartitionTrackerImpl jobMasterPartitionTrackerImpl = new JobMasterPartitionTrackerImpl(new JobID(), testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        jobMasterPartitionTrackerImpl.connectToResourceManager(testingResourceManagerGateway);
        jobMasterPartitionTrackerImpl.getClusterPartitionShuffleDescriptors(intermediateDataSetID);
        MatcherAssert.assertThat(testingResourceManagerGateway.requestedIntermediateDataSetIds, Matchers.contains(new IntermediateDataSetID[]{intermediateDataSetID}));
    }

    @Test(expected = NullPointerException.class)
    public void testGetShuffleDescriptorsBeforeConnectToResourceManager() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        new JobMasterPartitionTrackerImpl(new JobID(), testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        }).getClusterPartitionShuffleDescriptors(intermediateDataSetID);
    }

    private static TaskExecutorGateway createTaskExecutorGateway(ResourceID resourceID, Collection<ReleaseCall> collection) {
        return new TestingTaskExecutorGatewayBuilder().setReleaseOrPromotePartitionsConsumer((jobID, set, set2) -> {
            collection.add(new ReleaseCall(resourceID, jobID, set, set2));
        }).createTestingTaskExecutorGateway();
    }
}
