/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexInputInfoComputerTestUtil;
import org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer;
import org.junit.jupiter.api.Test;

class AllToAllVertexInputInfoComputerTest {
    AllToAllVertexInputInfoComputerTest() {
    }

    @Test
    void testComputeInputsWithIntraInputKeyCorrelation() {
        this.testComputeInputsWithIntraInputKeyCorrelation(1);
        this.testComputeInputsWithIntraInputKeyCorrelation(10);
    }

    void testComputeInputsWithIntraInputKeyCorrelation(int numInputInfos) {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfos = new ArrayList<BlockingInputInfo>();
        List<BlockingInputInfo> leftInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, numInputInfos, 10, true, true, List.of());
        List<BlockingInputInfo> rightInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(2, numInputInfos, 10, true, true, List.of());
        inputInfos.addAll(leftInputInfos);
        inputInfos.addAll(rightInputInfos);
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs, inputInfos, 3, 3);
        List<Map<IndexRange, IndexRange>> targetConsumedSubpartitionGroups = List.of(Map.of(new IndexRange(0, 9), new IndexRange(0, 0)), Map.of(new IndexRange(0, 9), new IndexRange(1, 1)), Map.of(new IndexRange(0, 9), new IndexRange(2, 2)));
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(targetConsumedSubpartitionGroups, inputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos}, leftInputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos}, rightInputInfos, vertexInputs);
    }

    @Test
    void testInputsUnionWithDifferentNumPartitions() {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfos = new ArrayList<BlockingInputInfo>();
        List<BlockingInputInfo> leftInputInfos1 = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 2, true, true, List.of());
        List<BlockingInputInfo> leftInputInfos2 = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 3, true, true, List.of());
        List<BlockingInputInfo> rightInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(2, 1, 2, true, true, List.of());
        inputInfos.addAll(leftInputInfos1);
        inputInfos.addAll(leftInputInfos2);
        inputInfos.addAll(rightInputInfos);
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 2, 1, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs, inputInfos, 2, 3);
        List<Map<IndexRange, IndexRange>> left1TargetConsumedSubpartitionGroups = List.of(Map.of(new IndexRange(0, 1), new IndexRange(0, 1)), Map.of(new IndexRange(0, 1), new IndexRange(2, 2)));
        List<Map<IndexRange, IndexRange>> left2TargetConsumedSubpartitionGroups = List.of(Map.of(new IndexRange(0, 2), new IndexRange(0, 1)), Map.of(new IndexRange(0, 2), new IndexRange(2, 2)));
        List<Map<IndexRange, IndexRange>> rightTargetConsumedSubpartitionGroups = List.of(Map.of(new IndexRange(0, 1), new IndexRange(0, 1)), Map.of(new IndexRange(0, 1), new IndexRange(2, 2)));
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(left1TargetConsumedSubpartitionGroups, leftInputInfos1, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(left2TargetConsumedSubpartitionGroups, leftInputInfos2, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(rightTargetConsumedSubpartitionGroups, rightInputInfos, vertexInputs);
    }

    @Test
    void testComputeSkewedInputWithIntraInputKeyCorrelation() {
        this.testComputeSkewedInputWithIntraInputKeyCorrelation(1);
        this.testComputeSkewedInputWithIntraInputKeyCorrelation(10);
    }

    void testComputeSkewedInputWithIntraInputKeyCorrelation(int numInputInfos) {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfos = new ArrayList<BlockingInputInfo>();
        List<BlockingInputInfo> leftInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, numInputInfos, 10, true, true, List.of(Integer.valueOf(0)));
        List<BlockingInputInfo> rightInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(2, numInputInfos, 10, true, true, List.of());
        inputInfos.addAll(leftInputInfos);
        inputInfos.addAll(rightInputInfos);
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs, inputInfos, 3, 3);
        List<Map<IndexRange, IndexRange>> targetConsumedSubpartitionGroups = List.of(Map.of(new IndexRange(0, 9), new IndexRange(0, 0)), Map.of(new IndexRange(0, 9), new IndexRange(1, 1)), Map.of(new IndexRange(0, 9), new IndexRange(2, 2)));
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(targetConsumedSubpartitionGroups, inputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{100L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos}, leftInputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos}, rightInputInfos, vertexInputs);
    }

    @Test
    void testComputeSkewedInputWithoutIntraInputKeyCorrelation() {
        this.testComputeSkewedInputWithoutIntraInputKeyCorrelation(1);
        this.testComputeSkewedInputWithoutIntraInputKeyCorrelation(10);
    }

    void testComputeSkewedInputWithoutIntraInputKeyCorrelation(int numInputInfos) {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfos = new ArrayList<BlockingInputInfo>();
        List<BlockingInputInfo> leftInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, numInputInfos, 10, false, true, List.of(Integer.valueOf(0)));
        List<BlockingInputInfo> rightInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(2, numInputInfos, 10, true, true, List.of());
        inputInfos.addAll(leftInputInfos);
        inputInfos.addAll(rightInputInfos);
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs, inputInfos, 7, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{20L * (long)numInputInfos, 20L * (long)numInputInfos, 20L * (long)numInputInfos, 20L * (long)numInputInfos, 20L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos}, leftInputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos}, rightInputInfos, vertexInputs);
    }

    @Test
    void testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation() {
        this.testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation(1);
        this.testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation(10);
    }

    void testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation(int numInputInfos) {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfos = new ArrayList<BlockingInputInfo>();
        List<BlockingInputInfo> leftInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, numInputInfos, 2, false, true, List.of(Integer.valueOf(1)));
        List<BlockingInputInfo> rightInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(2, numInputInfos, 2, false, true, List.of(Integer.valueOf(1)));
        inputInfos.addAll(leftInputInfos);
        inputInfos.addAll(rightInputInfos);
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 1, 1, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs, inputInfos, 2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12L * (long)numInputInfos, 12L * (long)numInputInfos}, leftInputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{22L * (long)numInputInfos, 22L * (long)numInputInfos}, rightInputInfos, vertexInputs);
        Map vertexInputs2 = computer.compute(new JobVertexID(), inputInfos, 1, 1, 1, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs2, inputInfos, 1, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{24L * (long)numInputInfos}, leftInputInfos, vertexInputs2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{24L * (long)numInputInfos}, rightInputInfos, vertexInputs2);
        Map vertexInputs4 = computer.compute(new JobVertexID(), inputInfos, 1, 1, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs4, inputInfos, 4, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 12L * (long)numInputInfos}, leftInputInfos, vertexInputs4);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 12L * (long)numInputInfos}, rightInputInfos, vertexInputs4);
        Map vertexInputs5 = computer.compute(new JobVertexID(), inputInfos, 5, 5, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs5, inputInfos, 5, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{2L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 12L * (long)numInputInfos}, leftInputInfos, vertexInputs5);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{2L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 10L * (long)numInputInfos, 12L * (long)numInputInfos}, rightInputInfos, vertexInputs5);
    }

    @Test
    void testComputeSkewedInputsWithDifferentNumPartitions() {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfos = new ArrayList<BlockingInputInfo>();
        ArrayList<BlockingInputInfo> leftInputInfos = new ArrayList<BlockingInputInfo>();
        leftInputInfos.addAll(AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 2, false, true, List.of(Integer.valueOf(1))));
        leftInputInfos.addAll(AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 3, false, true, List.of(Integer.valueOf(1))));
        List<BlockingInputInfo> rightInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(2, 1, 2, false, true, List.of(Integer.valueOf(1)));
        inputInfos.addAll(leftInputInfos);
        inputInfos.addAll(rightInputInfos);
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 2, 1, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs(vertexInputs, inputInfos, 2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{55L, 55L}, leftInputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12L, 12L}, rightInputInfos, vertexInputs);
    }

    @Test
    void testComputeRebalancedWithAndWithoutCorrelations() {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        List<BlockingInputInfo> inputInfoWithCorrelations = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 10, true, false, List.of());
        Map vertexInputs1 = computer.compute(new JobVertexID(), inputInfoWithCorrelations, 2, 1, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs1, inputInfoWithCorrelations.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{20L, 10L}, inputInfoWithCorrelations, vertexInputs1);
        List<BlockingInputInfo> inputInfoWithoutCorrelations = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 10, false, false, List.of());
        Map vertexInputs2 = computer.compute(new JobVertexID(), inputInfoWithoutCorrelations, 2, 1, 5, 1L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs2, inputInfoWithoutCorrelations.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{15L, 15L}, inputInfoWithoutCorrelations, vertexInputs2);
        Map vertexInputs3 = computer.compute(new JobVertexID(), inputInfoWithoutCorrelations, 3, 1, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs3, inputInfoWithoutCorrelations.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10L, 10L, 10L}, inputInfoWithoutCorrelations, vertexInputs3);
    }

    @Test
    void testComputeInputsWithDifferentCorrelations() {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfos = new ArrayList<BlockingInputInfo>();
        List<BlockingInputInfo> rebalancedInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 10, false, false, List.of());
        List<BlockingInputInfo> normalInputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 10, true, true, List.of());
        inputInfos.addAll(rebalancedInputInfos);
        inputInfos.addAll(normalInputInfos);
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, normalInputInfos.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10L, 10L, 10L}, normalInputInfos, vertexInputs);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, rebalancedInputInfos.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10L, 10L, 10L}, rebalancedInputInfos, vertexInputs);
    }

    @Test
    void testComputeWithLargeDataVolumePerTask() {
        AllToAllVertexInputInfoComputer computer = AllToAllVertexInputInfoComputerTest.createAllToAllVertexInputInfoComputer();
        List<BlockingInputInfo> inputInfos = AllToAllVertexInputInfoComputerTest.createBlockingInputInfos(1, 1, 10, true, true, List.of());
        Map vertexInputs = computer.compute(new JobVertexID(), inputInfos, 10, 1, 10, 100L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfos.get(0), 1);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{30L}, inputInfos, vertexInputs);
    }

    public static List<BlockingInputInfo> createBlockingInputInfos(int typeNumber, int numInputInfos, int numPartitions, boolean isIntraInputKeyCorrelated, boolean areInterInputsKeysCorrelated, List<Integer> skewedSubpartitionIndex) {
        return VertexInputInfoComputerTestUtil.createBlockingInputInfos(typeNumber, numInputInfos, numPartitions, 3, isIntraInputKeyCorrelated, areInterInputsKeysCorrelated, 1, 10.0, List.of(), skewedSubpartitionIndex, false);
    }

    private static AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer() {
        return new AllToAllVertexInputInfoComputer(4.0, 10L);
    }
}

