package org.apache.flink.runtime.shuffle;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/shuffle/ShuffleMasterTest.class */
class ShuffleMasterTest {
    private static final String STOP_TRACKING_PARTITION_KEY = "stop_tracking_partition_key";
    private static final String PARTITION_REGISTRATION_EVENT = "registerPartitionWithProducer";
    private static final String EXTERNAL_PARTITION_RELEASE_EVENT = "releasePartitionExternally";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/shuffle/ShuffleMasterTest$TestShuffleMaster.class */
    public static class TestShuffleMaster extends NettyShuffleMaster {
        private static final AtomicReference<TestShuffleMaster> currentInstance = new AtomicReference<>();
        private static final BlockingQueue<String> partitionEvents = new LinkedBlockingQueue();
        private final AtomicBoolean started;
        private final AtomicBoolean closed;
        private final BlockingQueue<ResultPartitionID> partitions;
        private final AtomicReference<JobShuffleContext> jobContext;
        private final boolean stopTrackingPartition;

        public TestShuffleMaster(Configuration configuration) {
            super(new ShuffleMasterContextImpl(configuration, th -> {
            }));
            this.started = new AtomicBoolean();
            this.closed = new AtomicBoolean();
            this.partitions = new LinkedBlockingQueue();
            this.jobContext = new AtomicReference<>();
            this.stopTrackingPartition = ((Boolean) configuration.get(ConfigurationUtils.getBooleanConfigOption(ShuffleMasterTest.STOP_TRACKING_PARTITION_KEY), false)).booleanValue();
            currentInstance.set(this);
        }

        public void start() throws Exception {
            Assertions.assertThat(this.started).isFalse();
            Assertions.assertThat(this.closed).isFalse();
            this.started.set(true);
            super.start();
        }

        public void close() throws Exception {
            assertShuffleMasterAlive();
            this.closed.set(true);
            super.close();
        }

        public void registerJob(JobShuffleContext jobShuffleContext) {
            assertShuffleMasterAlive();
            Assertions.assertThat(this.jobContext.compareAndSet(null, jobShuffleContext)).isTrue();
            super.registerJob(jobShuffleContext);
        }

        public void unregisterJob(JobID jobID) {
            assertJobRegistered();
            this.jobContext.set(null);
            super.unregisterJob(jobID);
        }

        public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            assertJobRegistered();
            partitionEvents.add(ShuffleMasterTest.PARTITION_REGISTRATION_EVENT);
            CompletableFuture<NettyShuffleDescriptor> completableFuture = new CompletableFuture<>();
            try {
                NettyShuffleDescriptor nettyShuffleDescriptor = (NettyShuffleDescriptor) super.registerPartitionWithProducer(jobID, partitionDescriptor, producerDescriptor).get();
                if (this.partitions.size() == 1 && this.stopTrackingPartition) {
                    this.jobContext.get().stopTrackingAndReleasePartitions(Collections.singletonList(this.partitions.peek())).thenRun(() -> {
                        completableFuture.completeExceptionally(new Exception("Test"));
                    });
                } else {
                    completableFuture.complete(nettyShuffleDescriptor);
                }
                this.partitions.add(nettyShuffleDescriptor.getResultPartitionID());
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
            return completableFuture;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            assertJobRegistered();
            partitionEvents.add(ShuffleMasterTest.EXTERNAL_PARTITION_RELEASE_EVENT);
            super.releasePartitionExternally(shuffleDescriptor);
        }

        private void assertShuffleMasterAlive() {
            Assertions.assertThat(this.closed).isFalse();
            Assertions.assertThat(this.started).isTrue();
        }

        private void assertJobRegistered() {
            assertShuffleMasterAlive();
            Assertions.assertThat(this.jobContext).isNotNull();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/shuffle/ShuffleMasterTest$TestShuffleServiceFactory.class */
    public static class TestShuffleServiceFactory extends NettyShuffleServiceFactory {
        /* renamed from: createShuffleMaster, reason: merged with bridge method [inline-methods] */
        public NettyShuffleMaster m572createShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
            return new TestShuffleMaster(shuffleMasterContext.getConfiguration());
        }
    }

    ShuffleMasterTest() {
    }

    @BeforeEach
    void before() {
        TestShuffleMaster.partitionEvents.clear();
    }

    @Test
    void testShuffleMasterLifeCycle() throws Exception {
        MiniCluster miniCluster = new MiniCluster(createClusterConfiguration(false));
        try {
            miniCluster.start();
            miniCluster.executeJobBlocking(createJobGraph());
            miniCluster.close();
            Assertions.assertThat(TestShuffleMaster.currentInstance.get().closed).isTrue();
            Assertions.assertThat(TestShuffleMaster.partitionEvents).containsExactly(new String[]{PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT});
        } catch (Throwable th) {
            try {
                miniCluster.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testStopTrackingPartition() throws Exception {
        MiniCluster miniCluster = new MiniCluster(createClusterConfiguration(true));
        try {
            miniCluster.start();
            miniCluster.executeJobBlocking(createJobGraph());
            miniCluster.close();
            Assertions.assertThat(TestShuffleMaster.currentInstance.get().closed).isTrue();
            Assertions.assertThat(TestShuffleMaster.partitionEvents).containsExactly(new String[]{PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT});
        } catch (Throwable th) {
            try {
                miniCluster.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private MiniClusterConfiguration createClusterConfiguration(boolean z) {
        Configuration configuration = new Configuration();
        configuration.set(ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS, TestShuffleServiceFactory.class.getName());
        configuration.set(ConfigurationUtils.getBooleanConfigOption(STOP_TRACKING_PARTITION_KEY), Boolean.valueOf(z));
        return new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(configuration).build();
    }

    private JobGraph createJobGraph() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(2);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setParallelism(2);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph batchJobGraph = JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2);
        RestartStrategyUtils.configureFixedDelayRestartStrategy(batchJobGraph, 2, Duration.ofSeconds(2L));
        return batchJobGraph;
    }
}
