/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class ForwardGroupComputeUtilTest
extends TestLogger {
    @Test
    public void testIsolatedVertices() throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, 0, new int[0]);
    }

    @Test
    public void testVariousResultPartitionTypesBetweenVertices() throws Exception {
        this.testThreeVerticesConnectSequentially(false, true, 1, 2);
        this.testThreeVerticesConnectSequentially(false, false, 0, new int[0]);
        this.testThreeVerticesConnectSequentially(true, true, 1, 3);
    }

    private void testThreeVerticesConnectSequentially(boolean isForward1, boolean isForward2, int numOfGroups, int ... groupSizes) throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        if (isForward1) {
            ((IntermediateDataSet)v1.getProducedDataSets().get(0)).getConsumer().setForward(true);
        }
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        if (isForward2) {
            ((IntermediateDataSet)v2.getProducedDataSets().get(0)).getConsumer().setForward(true);
        }
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, numOfGroups, groupSizes);
    }

    @Test
    public void testTwoInputsMergesIntoOne() throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        JobVertex v4 = new JobVertex("v4");
        v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ((IntermediateDataSet)v1.getProducedDataSets().get(0)).getConsumer().setForward(true);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ((IntermediateDataSet)v2.getProducedDataSets().get(0)).getConsumer().setForward(true);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3, v4);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, 1, 3);
    }

    @Test
    public void testOneInputSplitsIntoTwo() throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        JobVertex v4 = new JobVertex("v4");
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ((IntermediateDataSet)v2.getProducedDataSets().get(0)).getConsumer().setForward(true);
        ((IntermediateDataSet)v2.getProducedDataSets().get(1)).getConsumer().setForward(true);
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3, v4);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, 1, 3);
    }

    private static Set<ForwardGroup> computeForwardGroups(JobVertex ... vertices) throws Exception {
        Arrays.asList(vertices).forEach(vertex -> vertex.setInvokableClass(NoOpInvokable.class));
        DefaultExecutionGraph executionGraph = ForwardGroupComputeUtilTest.createDynamicGraph(vertices);
        return new HashSet<ForwardGroup>(ForwardGroupComputeUtil.computeForwardGroups(Arrays.asList(vertices), arg_0 -> ((ExecutionGraph)executionGraph).getJobVertex(arg_0)).values());
    }

    private static void checkGroupSize(Set<ForwardGroup> groups, int numOfGroups, int ... sizes) {
        Assert.assertEquals((long)numOfGroups, (long)groups.size());
        Matchers.containsInAnyOrder((Object[])new Object[]{groups.stream().map(ForwardGroup::size).collect(Collectors.toList()), sizes});
    }

    private static DefaultExecutionGraph createDynamicGraph(JobVertex ... vertices) throws Exception {
        TestingDefaultExecutionGraphBuilder builder = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(new JobID(), "TestJob", vertices)).setVertexParallelismStore(AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(Arrays.asList(vertices), (int)10));
        return builder.buildDynamicGraph();
    }
}

