package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactoryTest;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.class */
public class RemoveCachedShuffleDescriptorTest extends TestLogger {
    private static final int PARALLELISM = 4;
    private ScheduledExecutorService scheduledExecutorService;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private ManuallyTriggeredScheduledExecutorService ioExecutor;

    @Before
    public void setup() {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
    }

    @After
    public void teardown() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFinished() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForAllToAllEdgeAfterFinished() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(0), 4, 3);
    }

    private void testRemoveCacheForAllToAllEdgeAfterFinished(TestingBlobWriter testingBlobWriter, int i, int i2) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        ExecutionGraph executionGraph = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.ALL_TO_ALL, testingBlobWriter).getExecutionGraph();
        Assert.assertEquals(4L, TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2), jobID, testingBlobWriter).length);
        Assert.assertEquals(i, testingBlobWriter.numberOfBlobs());
        CompletableFuture.runAsync(() -> {
            transitionTasksToFinished(executionGraph, createNoOpVertex2.getID());
        }, this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assert.assertNull(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2));
        Assert.assertEquals(i2, testingBlobWriter.numberOfBlobs());
    }

    @Test
    public void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFailover() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForAllToAllEdgeAfterFailover() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(0), 4, 3);
    }

    private void testRemoveCacheForAllToAllEdgeAfterFailover(TestingBlobWriter testingBlobWriter, int i, int i2) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        DefaultScheduler createSchedulerAndDeploy = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.ALL_TO_ALL, testingBlobWriter);
        ExecutionGraph executionGraph = createSchedulerAndDeploy.getExecutionGraph();
        Assert.assertEquals(4L, TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2), jobID, testingBlobWriter).length);
        Assert.assertEquals(i, testingBlobWriter.numberOfBlobs());
        triggerGlobalFailoverAndComplete(createSchedulerAndDeploy, createNoOpVertex);
        this.ioExecutor.triggerAll();
        Assert.assertNull(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2));
        Assert.assertEquals(i2, testingBlobWriter.numberOfBlobs());
    }

    @Test
    public void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 7, 6);
    }

    private void testRemoveCacheForPointwiseEdgeAfterFinished(TestingBlobWriter testingBlobWriter, int i, int i2) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        ExecutionGraph executionGraph = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.POINTWISE, testingBlobWriter).getExecutionGraph();
        Assert.assertEquals(1L, TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2), jobID, testingBlobWriter).length);
        Assert.assertEquals(i, testingBlobWriter.numberOfBlobs());
        ExecutionVertex executionVertex = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(createNoOpVertex2.getID()))).getTaskVertices()[0];
        CompletableFuture.runAsync(() -> {
            transitionTaskToFinished(executionGraph, executionVertex);
        }, this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assert.assertNull(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 0));
        Assert.assertEquals(1L, TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 1), jobID, testingBlobWriter).length);
        Assert.assertEquals(i2, testingBlobWriter.numberOfBlobs());
    }

    @Test
    public void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFailover() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForPointwiseEdgeAfterFailover() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(0), 7, 6);
    }

    private void testRemoveCacheForPointwiseEdgeAfterFailover(TestingBlobWriter testingBlobWriter, int i, int i2) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        ExecutionGraph executionGraph = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.POINTWISE, testingBlobWriter).getExecutionGraph();
        Assert.assertEquals(1L, TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2), jobID, testingBlobWriter).length);
        Assert.assertEquals(i, testingBlobWriter.numberOfBlobs());
        triggerExceptionAndComplete(executionGraph, createNoOpVertex, createNoOpVertex2);
        this.ioExecutor.triggerAll();
        Assert.assertNull(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 0));
        Assert.assertEquals(1L, TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 1), jobID, testingBlobWriter).length);
        Assert.assertEquals(i2, testingBlobWriter.numberOfBlobs());
    }

    private DefaultScheduler createSchedulerAndDeploy(JobID jobID, JobVertex jobVertex, JobVertex jobVertex2, DistributionPattern distributionPattern, BlobWriter blobWriter) throws Exception {
        jobVertex2.connectNewDataSetAsInput(jobVertex, distributionPattern, ResultPartitionType.BLOCKING);
        DefaultScheduler createScheduler = createScheduler(jobID, new ArrayList(Arrays.asList(jobVertex, jobVertex2)), blobWriter, this.mainThreadExecutor, this.ioExecutor);
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        TestingLogicalSlotBuilder testingLogicalSlotBuilder = new TestingLogicalSlotBuilder();
        CompletableFuture.runAsync(() -> {
            try {
                deployTasks(executionGraph, jobVertex.getID(), testingLogicalSlotBuilder);
                transitionTasksToFinished(executionGraph, jobVertex.getID());
                deployTasks(executionGraph, jobVertex2.getID(), testingLogicalSlotBuilder);
            } catch (Exception e) {
                throw new RuntimeException("Exceptions shouldn't happen here.", e);
            }
        }, this.mainThreadExecutor).join();
        return createScheduler;
    }

    private void triggerGlobalFailoverAndComplete(DefaultScheduler defaultScheduler, JobVertex jobVertex) throws TimeoutException {
        Exception exc = new Exception();
        ExecutionGraph executionGraph = defaultScheduler.getExecutionGraph();
        CompletableFuture.runAsync(() -> {
            defaultScheduler.handleGlobalFailure(exc);
            Iterator it = executionGraph.getAllExecutionVertices().iterator();
            while (it.hasNext()) {
                ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().completeCancelling();
            }
        }, this.mainThreadExecutor).join();
        for (ExecutionVertex executionVertex : ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertex.getID()))).getTaskVertices()) {
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.DEPLOYING, 1000L);
        }
    }

    private void triggerExceptionAndComplete(ExecutionGraph executionGraph, JobVertex jobVertex, JobVertex jobVertex2) throws TimeoutException {
        ExecutionVertex executionVertex = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertex.getID()))).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertex2.getID()))).getTaskVertices()[0];
        CompletableFuture.runAsync(() -> {
            executionVertex2.markFailed(new PartitionNotFoundException(new ResultPartitionID()));
        }, this.mainThreadExecutor).join();
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.DEPLOYING, 1000L);
    }

    private static DefaultScheduler createScheduler(JobID jobID, List<JobVertex> list, BlobWriter blobWriter, ComponentMainThreadExecutor componentMainThreadExecutor, ScheduledExecutorService scheduledExecutorService) throws Exception {
        return SchedulerTestingUtils.newSchedulerBuilder(JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobID).addJobVertices(list).build(), componentMainThreadExecutor).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).setBlobWriter(blobWriter).setIoExecutor(scheduledExecutorService).build();
    }

    private static void deployTasks(ExecutionGraph executionGraph, JobVertexID jobVertexID, TestingLogicalSlotBuilder testingLogicalSlotBuilder) throws JobException, ExecutionException, InterruptedException {
        for (ExecutionVertex executionVertex : ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID))).getTaskVertices()) {
            TestingLogicalSlot createTestingLogicalSlot = testingLogicalSlotBuilder.createTestingLogicalSlot();
            executionVertex.getCurrentExecutionAttempt().registerProducedPartitions(createTestingLogicalSlot.getTaskManagerLocation(), true).get();
            executionVertex.tryAssignResource(createTestingLogicalSlot);
            executionVertex.deploy();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void transitionTasksToFinished(ExecutionGraph executionGraph, JobVertexID jobVertexID) {
        for (ExecutionVertex executionVertex : ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID))).getTaskVertices()) {
            transitionTaskToFinished(executionGraph, executionVertex);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void transitionTaskToFinished(ExecutionGraph executionGraph, ExecutionVertex executionVertex) {
        executionGraph.updateState(new TaskExecutionStateTransition(new TaskExecutionState(executionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED)));
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> getConsumedCachedShuffleDescriptor(ExecutionGraph executionGraph, JobVertex jobVertex) {
        return getConsumedCachedShuffleDescriptor(executionGraph, jobVertex, 0);
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> getConsumedCachedShuffleDescriptor(ExecutionGraph executionGraph, JobVertex jobVertex, int i) {
        ExecutionJobVertex jobVertex2 = executionGraph.getJobVertex(jobVertex.getID());
        return ((IntermediateResult) ((ExecutionJobVertex) Objects.requireNonNull(jobVertex2)).getInputs().get(0)).getCachedShuffleDescriptors(jobVertex2.getTaskVertices()[i].getConsumedPartitionGroup(0));
    }
}
