/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class IntermediateResultPartitionTest
extends TestLogger {
    @Test
    public void testPipelinedPartitionConsumable() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.PIPELINED, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        partition1.markDataProduced();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        result.resetForNewExecution();
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
    }

    @Test
    public void testBlockingPartitionConsumable() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        partition1.markFinished();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        partition2.markFinished();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertTrue((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
    }

    @Test
    public void testBlockingPartitionResetting() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        partition1.markFinished();
        Assert.assertEquals((long)1L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertEquals((long)2L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        partition2.markFinished();
        Assert.assertEquals((long)1L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        partition1.markFinished();
        Assert.assertEquals((long)0L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertTrue((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertEquals((long)2L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
    }

    @Test
    public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, Arrays.asList(7, 7));
    }

    @Test
    public void testGetNumberOfSubpartitionsForNonDynamicPointwiseGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false, Arrays.asList(4, 3));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(7, 7));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointwiseGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true, Arrays.asList(4, 4));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicAllToAllGraph() throws Exception {
        this.testGetNumberOfSubpartitions(-1, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(13, 13));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicPointwiseGraph() throws Exception {
        this.testGetNumberOfSubpartitions(-1, DistributionPattern.POINTWISE, true, Arrays.asList(7, 7));
    }

    private void testGetNumberOfSubpartitions(int consumerParallelism, DistributionPattern distributionPattern, boolean isDynamicGraph, List<Integer> expectedNumSubpartitions) throws Exception {
        int producerParallelism = 2;
        int consumerMaxParallelism = 13;
        ExecutionGraph eg = IntermediateResultPartitionTest.createExecutionGraph(2, consumerParallelism, 13, distributionPattern, isDynamicGraph);
        Iterator vertexIterator = eg.getVerticesTopologically().iterator();
        ExecutionJobVertex producer = (ExecutionJobVertex)vertexIterator.next();
        if (isDynamicGraph) {
            ExecutionJobVertexTest.initializeVertex(producer);
        }
        IntermediateResult result = producer.getProducedDataSets()[0];
        MatcherAssert.assertThat((Object)expectedNumSubpartitions.size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat(Arrays.stream(result.getPartitions()).map(IntermediateResultPartition::getNumberOfSubpartitions).collect(Collectors.toList()), (Matcher)Matchers.equalTo(expectedNumSubpartitions));
    }

    public static ExecutionGraph createExecutionGraph(int producerParallelism, int consumerParallelism, int consumerMaxParallelism, DistributionPattern distributionPattern, boolean isDynamicGraph) throws Exception {
        JobVertex v1 = new JobVertex("v1");
        v1.setInvokableClass(NoOpInvokable.class);
        v1.setParallelism(producerParallelism);
        JobVertex v2 = new JobVertex("v2");
        v2.setInvokableClass(NoOpInvokable.class);
        if (consumerParallelism > 0) {
            v2.setParallelism(consumerParallelism);
        }
        if (consumerMaxParallelism > 0) {
            v2.setMaxParallelism(consumerMaxParallelism);
        }
        v2.connectNewDataSetAsInput(v1, distributionPattern, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(Arrays.asList(v1, v2)).build();
        Configuration configuration = new Configuration();
        TestingDefaultExecutionGraphBuilder builder = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setJobMasterConfig(configuration).setVertexParallelismStore(IntermediateResultPartitionTest.computeVertexParallelismStoreConsideringDynamicGraph(jobGraph.getVertices(), isDynamicGraph, consumerMaxParallelism));
        if (isDynamicGraph) {
            return builder.buildDynamicGraph();
        }
        return builder.build();
    }

    public static VertexParallelismStore computeVertexParallelismStoreConsideringDynamicGraph(Iterable<JobVertex> vertices, boolean isDynamicGraph, int defaultMaxParallelism) {
        if (isDynamicGraph) {
            return AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(vertices, (int)defaultMaxParallelism);
        }
        return SchedulerBase.computeVertexParallelismStore(vertices);
    }

    private static IntermediateResult createResult(ResultPartitionType resultPartitionType, int parallelism) throws Exception {
        JobVertex source = new JobVertex("v1");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(parallelism);
        JobVertex sink = new JobVertex("v2");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(parallelism);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()).setIoExecutor(executorService).setFutureExecutor(executorService).build();
        ExecutionJobVertex ejv = scheduler.getExecutionJobVertex(source.getID());
        return ejv.getProducedDataSets()[0];
    }
}

