package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.class */
class TaskDeploymentDescriptorFactoryTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final int PARALLELISM = 4;

    TaskDeploymentDescriptorFactoryTest() {
    }

    @Test
    void testCacheShuffleDescriptorAsNonOffloaded() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, Integer.MAX_VALUE);
        testCacheShuffleDescriptor(configuration);
    }

    @Test
    void testCacheShuffleDescriptorAsOffloaded() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, 0);
        testCacheShuffleDescriptor(configuration);
    }

    private void testCacheShuffleDescriptor(Configuration configuration) throws Exception {
        JobID jobID = new JobID();
        TestingBlobWriter testingBlobWriter = new TestingBlobWriter();
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> tuple2 = setupExecutionGraphAndGetVertices(jobID, testingBlobWriter, configuration);
        ExecutionVertex executionVertex = ((ExecutionJobVertex) tuple2.f1).getTaskVertices()[0];
        createTaskDeploymentDescriptor(executionVertex);
        ShuffleDescriptor[] deserializeShuffleDescriptors = TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(((IntermediateResult) ((ExecutionJobVertex) tuple2.f1).getInputs().get(0)).getCachedShuffleDescriptors(executionVertex.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptorGroups(), jobID, testingBlobWriter);
        Assertions.assertThat(executionVertex.getConsumedPartitionGroup(0)).hasSize(deserializeShuffleDescriptors.length);
        int i = 0;
        Iterator it = executionVertex.getConsumedPartitionGroup(0).iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            Assertions.assertThat(deserializeShuffleDescriptors[i2].getResultPartitionID().getPartitionId()).isEqualTo((IntermediateResultPartitionID) it.next());
        }
    }

    @Test
    void testHybridVertexFinish() throws Exception {
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> buildExecutionGraph = buildExecutionGraph();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) buildExecutionGraph.f0;
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) buildExecutionGraph.f1;
        createTaskDeploymentDescriptor(executionJobVertex2.getTaskVertices()[0]);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        executionVertex.finishPartitionsIfNeeded();
        executionVertex2.finishPartitionsIfNeeded();
        ExecutionVertex executionVertex3 = executionJobVertex2.getTaskVertices()[1];
        createTaskDeploymentDescriptor(executionVertex3);
        Assertions.assertThat(((IntermediateResult) executionJobVertex2.getInputs().get(0)).getCachedShuffleDescriptors(executionVertex3.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptorGroups()).hasSize(2);
        executionJobVertex.getTaskVertices()[2].finishPartitionsIfNeeded();
        ExecutionVertex executionVertex4 = executionJobVertex2.getTaskVertices()[2];
        createTaskDeploymentDescriptor(executionVertex4);
        Assertions.assertThat(((IntermediateResult) executionJobVertex2.getInputs().get(0)).getCachedShuffleDescriptors(executionVertex4.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptorGroups()).hasSize(3);
    }

    @Test
    void testGetOffloadedShuffleDescriptorBeforeLoading() throws Exception {
        TestingBlobWriter testingBlobWriter = new TestingBlobWriter(0);
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        configuration.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, 0);
        TaskDeploymentDescriptor createTaskDeploymentDescriptor = createTaskDeploymentDescriptor(((ExecutionJobVertex) setupExecutionGraphAndGetVertices(jobID, testingBlobWriter, configuration).f1).getTaskVertices()[0]);
        Assertions.assertThatThrownBy(() -> {
            ((InputGateDeploymentDescriptor) createTaskDeploymentDescriptor.getInputGates().get(0)).getShuffleDescriptors();
        }).isInstanceOf(IllegalStateException.class);
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices(JobID jobID, BlobWriter blobWriter, Configuration configuration) throws Exception {
        return setupExecutionGraphAndGetVertices(jobID, blobWriter, ResultPartitionType.BLOCKING, (v0) -> {
            return v0.isBlockingOrBlockingPersistentResultPartition();
        }, configuration);
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices(JobID jobID, BlobWriter blobWriter, ResultPartitionType resultPartitionType, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, Configuration configuration) throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 4);
        JobVertex createJobVertex2 = createJobVertex("v2", 4);
        JobVertexConnectionUtils.connectNewDataSetAsInput(createJobVertex2, createJobVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobID, new ArrayList(Arrays.asList(createJobVertex, createJobVertex2)), blobWriter, markPartitionFinishedStrategy, configuration);
        return Tuple2.of(createExecutionGraph.getJobVertex(createJobVertex.getID()), createExecutionGraph.getJobVertex(createJobVertex2.getID()));
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> buildExecutionGraph() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 4);
        JobVertex createJobVertex2 = createJobVertex("v2", 4);
        JobVertexConnectionUtils.connectNewDataSetAsInput(createJobVertex2, createJobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL);
        AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.batchJobGraph(createJobVertex, createJobVertex2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setHybridPartitionDataConsumeConstraint(JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS).buildAdaptiveBatchJobScheduler();
        buildAdaptiveBatchJobScheduler.startScheduling();
        ExecutionGraph executionGraph = buildAdaptiveBatchJobScheduler.getExecutionGraph();
        return Tuple2.of(executionGraph.getJobVertex(createJobVertex.getID()), executionGraph.getJobVertex(createJobVertex2.getID()));
    }

    private static JobVertex createJobVertex(String str, int i) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private static ExecutionGraph createExecutionGraph(JobID jobID, List<JobVertex> list, BlobWriter blobWriter, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, Configuration configuration) throws JobException, JobExecutionException {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobMasterConfig(configuration).setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobID).addJobVertices(list).build()).setBlobWriter(blobWriter).setMarkPartitionFinishedStrategy(markPartitionFinishedStrategy).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex executionVertex) throws IOException, ClusterDatasetCorruptedException {
        return executionVertex.getExecutionGraphAccessor().getTaskDeploymentDescriptorFactory().createDeploymentDescriptor(executionVertex.getCurrentExecutionAttempt(), new AllocationID(), (JobManagerTaskRestore) null, Collections.emptyList());
    }
}
