/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class TaskDeploymentDescriptorFactoryTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final int PARALLELISM = 4;

    TaskDeploymentDescriptorFactoryTest() {
    }

    @Test
    void testCacheShuffleDescriptorAsNonOffloaded() throws Exception {
        Configuration jobMasterConfig = new Configuration();
        jobMasterConfig.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, (Object)Integer.MAX_VALUE);
        this.testCacheShuffleDescriptor(jobMasterConfig);
    }

    @Test
    void testCacheShuffleDescriptorAsOffloaded() throws Exception {
        Configuration jobMasterConfig = new Configuration();
        jobMasterConfig.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, (Object)0);
        this.testCacheShuffleDescriptor(jobMasterConfig);
    }

    private void testCacheShuffleDescriptor(Configuration jobMasterConfig) throws Exception {
        JobID jobId = new JobID();
        TestingBlobWriter blobWriter = new TestingBlobWriter();
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = this.setupExecutionGraphAndGetVertices(jobId, blobWriter, jobMasterConfig);
        ExecutionVertex ev21 = ((ExecutionJobVertex)executionJobVertices.f1).getTaskVertices()[0];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        IntermediateResult consumedResult = (IntermediateResult)((ExecutionJobVertex)executionJobVertices.f1).getInputs().get(0);
        List serializedShuffleDescriptors = consumedResult.getCachedShuffleDescriptors(ev21.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptorGroups();
        ShuffleDescriptor[] cachedShuffleDescriptors = TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(serializedShuffleDescriptors, jobId, blobWriter);
        Assertions.assertThat((Iterable)ev21.getConsumedPartitionGroup(0)).hasSize(cachedShuffleDescriptors.length);
        int idx = 0;
        for (IntermediateResultPartitionID consumedPartitionId : ev21.getConsumedPartitionGroup(0)) {
            Assertions.assertThat((Object)cachedShuffleDescriptors[idx++].getResultPartitionID().getPartitionId()).isEqualTo((Object)consumedPartitionId);
        }
    }

    @Test
    void testHybridVertexFinish() throws Exception {
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = this.buildExecutionGraph();
        ExecutionJobVertex ejv1 = (ExecutionJobVertex)executionJobVertices.f0;
        ExecutionJobVertex ejv2 = (ExecutionJobVertex)executionJobVertices.f1;
        ExecutionVertex ev21 = ejv2.getTaskVertices()[0];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        ExecutionVertex ev11 = ejv1.getTaskVertices()[0];
        ExecutionVertex ev12 = ejv1.getTaskVertices()[1];
        ev11.finishPartitionsIfNeeded();
        ev12.finishPartitionsIfNeeded();
        ExecutionVertex ev22 = ejv2.getTaskVertices()[1];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev22);
        IntermediateResult consumedResult = (IntermediateResult)ejv2.getInputs().get(0);
        List serializedShuffleDescriptors = consumedResult.getCachedShuffleDescriptors(ev22.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptorGroups();
        Assertions.assertThat((List)serializedShuffleDescriptors).hasSize(2);
        ExecutionVertex ev13 = ejv1.getTaskVertices()[2];
        ev13.finishPartitionsIfNeeded();
        ExecutionVertex ev23 = ejv2.getTaskVertices()[2];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev23);
        consumedResult = (IntermediateResult)ejv2.getInputs().get(0);
        serializedShuffleDescriptors = consumedResult.getCachedShuffleDescriptors(ev23.getConsumedPartitionGroup(0)).getAllSerializedShuffleDescriptorGroups();
        Assertions.assertThat((List)serializedShuffleDescriptors).hasSize(3);
    }

    @Test
    void testGetOffloadedShuffleDescriptorBeforeLoading() throws Exception {
        TestingBlobWriter blobWriter = new TestingBlobWriter(0);
        JobID jobId = new JobID();
        Configuration jobMasterConfig = new Configuration();
        jobMasterConfig.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, (Object)0);
        Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = this.setupExecutionGraphAndGetVertices(jobId, blobWriter, jobMasterConfig);
        ExecutionVertex ev21 = ((ExecutionJobVertex)executionJobVertices.f1).getTaskVertices()[0];
        TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        Assertions.assertThatThrownBy(() -> ((InputGateDeploymentDescriptor)tdd.getInputGates().get(0)).getShuffleDescriptors()).isInstanceOf(IllegalStateException.class);
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices(JobID jobId, BlobWriter blobWriter, Configuration jobMasterConfig) throws Exception {
        return this.setupExecutionGraphAndGetVertices(jobId, blobWriter, ResultPartitionType.BLOCKING, ResultPartitionType::isBlockingOrBlockingPersistentResultPartition, jobMasterConfig);
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices(JobID jobId, BlobWriter blobWriter, ResultPartitionType resultPartitionType, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, Configuration jobMasterConfig) throws Exception {
        JobVertex v1 = TaskDeploymentDescriptorFactoryTest.createJobVertex("v1", 4);
        JobVertex v2 = TaskDeploymentDescriptorFactoryTest.createJobVertex("v2", 4);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph executionGraph = TaskDeploymentDescriptorFactoryTest.createExecutionGraph(jobId, ordered, blobWriter, markPartitionFinishedStrategy, jobMasterConfig);
        return Tuple2.of((Object)executionGraph.getJobVertex(v1.getID()), (Object)executionGraph.getJobVertex(v2.getID()));
    }

    private Tuple2<ExecutionJobVertex, ExecutionJobVertex> buildExecutionGraph() throws Exception {
        JobVertex producer = TaskDeploymentDescriptorFactoryTest.createJobVertex("v1", 4);
        JobVertex consumer = TaskDeploymentDescriptorFactoryTest.createJobVertex("v2", 4);
        JobVertexConnectionUtils.connectNewDataSetAsInput(consumer, producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer, consumer);
        AdaptiveBatchScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setHybridPartitionDataConsumeConstraint(JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS).buildAdaptiveBatchJobScheduler();
        scheduler.startScheduling();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        return Tuple2.of((Object)executionGraph.getJobVertex(producer.getID()), (Object)executionGraph.getJobVertex(consumer.getID()));
    }

    private static JobVertex createJobVertex(String vertexName, int parallelism) {
        JobVertex jobVertex = new JobVertex(vertexName);
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private static ExecutionGraph createExecutionGraph(JobID jobId, List<JobVertex> jobVertices, BlobWriter blobWriter, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, Configuration jobMasterConfig) throws JobException, JobExecutionException {
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobId).addJobVertices(jobVertices).build();
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobMasterConfig(jobMasterConfig).setJobGraph(jobGraph).setBlobWriter(blobWriter).setMarkPartitionFinishedStrategy(markPartitionFinishedStrategy).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev) throws IOException, ClusterDatasetCorruptedException {
        return ev.getExecutionGraphAccessor().getTaskDeploymentDescriptorFactory().createDeploymentDescriptor(ev.getCurrentExecutionAttempt(), new AllocationID(), null, Collections.emptyList());
    }
}

