package org.apache.flink.runtime.scheduler.adaptivebatch.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexInputInfoComputerTestUtil;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/util/AllToAllVertexInputInfoComputerTest.class */
class AllToAllVertexInputInfoComputerTest {
    AllToAllVertexInputInfoComputerTest() {
    }

    @Test
    void testComputeInputsWithIntraInputKeyCorrelation() {
        testComputeInputsWithIntraInputKeyCorrelation(1);
        testComputeInputsWithIntraInputKeyCorrelation(10);
    }

    void testComputeInputsWithIntraInputKeyCorrelation(int i) {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, i, 10, true, true, List.of());
        List<BlockingInputInfo> createBlockingInputInfos2 = createBlockingInputInfos(2, i, 10, true, true, List.of());
        arrayList.addAll(createBlockingInputInfos);
        arrayList.addAll(createBlockingInputInfos2);
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute, arrayList, 3, 3);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(List.of(Map.of(new IndexRange(0, 9), new IndexRange(0, 0)), Map.of(new IndexRange(0, 9), new IndexRange(1, 1)), Map.of(new IndexRange(0, 9), new IndexRange(2, 2))), arrayList, compute);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10 * i, 10 * i, 10 * i}, createBlockingInputInfos, compute);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10 * i, 10 * i, 10 * i}, createBlockingInputInfos2, compute);
    }

    @Test
    void testInputsUnionWithDifferentNumPartitions() {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, 1, 2, true, true, List.of());
        List<BlockingInputInfo> createBlockingInputInfos2 = createBlockingInputInfos(1, 1, 3, true, true, List.of());
        List<BlockingInputInfo> createBlockingInputInfos3 = createBlockingInputInfos(2, 1, 2, true, true, List.of());
        arrayList.addAll(createBlockingInputInfos);
        arrayList.addAll(createBlockingInputInfos2);
        arrayList.addAll(createBlockingInputInfos3);
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 2, 1, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute, arrayList, 2, 3);
        List of = List.of(Map.of(new IndexRange(0, 1), new IndexRange(0, 1)), Map.of(new IndexRange(0, 1), new IndexRange(2, 2)));
        List of2 = List.of(Map.of(new IndexRange(0, 2), new IndexRange(0, 1)), Map.of(new IndexRange(0, 2), new IndexRange(2, 2)));
        List of3 = List.of(Map.of(new IndexRange(0, 1), new IndexRange(0, 1)), Map.of(new IndexRange(0, 1), new IndexRange(2, 2)));
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(of, createBlockingInputInfos, compute);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(of2, createBlockingInputInfos2, compute);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(of3, createBlockingInputInfos3, compute);
    }

    @Test
    void testComputeSkewedInputWithIntraInputKeyCorrelation() {
        testComputeSkewedInputWithIntraInputKeyCorrelation(1);
        testComputeSkewedInputWithIntraInputKeyCorrelation(10);
    }

    void testComputeSkewedInputWithIntraInputKeyCorrelation(int i) {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, i, 10, true, true, List.of(0));
        List<BlockingInputInfo> createBlockingInputInfos2 = createBlockingInputInfos(2, i, 10, true, true, List.of());
        arrayList.addAll(createBlockingInputInfos);
        arrayList.addAll(createBlockingInputInfos2);
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute, arrayList, 3, 3);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(List.of(Map.of(new IndexRange(0, 9), new IndexRange(0, 0)), Map.of(new IndexRange(0, 9), new IndexRange(1, 1)), Map.of(new IndexRange(0, 9), new IndexRange(2, 2))), arrayList, compute);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{100 * i, 10 * i, 10 * i}, createBlockingInputInfos, compute);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10 * i, 10 * i, 10 * i}, createBlockingInputInfos2, compute);
    }

    @Test
    void testComputeSkewedInputWithoutIntraInputKeyCorrelation() {
        testComputeSkewedInputWithoutIntraInputKeyCorrelation(1);
        testComputeSkewedInputWithoutIntraInputKeyCorrelation(10);
    }

    void testComputeSkewedInputWithoutIntraInputKeyCorrelation(int i) {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, i, 10, false, true, List.of(0));
        List<BlockingInputInfo> createBlockingInputInfos2 = createBlockingInputInfos(2, i, 10, true, true, List.of());
        arrayList.addAll(createBlockingInputInfos);
        arrayList.addAll(createBlockingInputInfos2);
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute, arrayList, 7, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{20 * i, 20 * i, 20 * i, 20 * i, 20 * i, 10 * i, 10 * i}, createBlockingInputInfos, compute);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10 * i, 10 * i, 10 * i, 10 * i, 10 * i, 10 * i, 10 * i}, createBlockingInputInfos2, compute);
    }

    @Test
    void testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation() {
        testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation(1);
        testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation(10);
    }

    void testComputeMultipleSkewedInputsWithoutIntraInputKeyCorrelation(int i) {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, i, 2, false, true, List.of(1));
        List<BlockingInputInfo> createBlockingInputInfos2 = createBlockingInputInfos(2, i, 2, false, true, List.of(1));
        arrayList.addAll(createBlockingInputInfos);
        arrayList.addAll(createBlockingInputInfos2);
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 1, 1, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute, arrayList, 2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12 * i, 12 * i}, createBlockingInputInfos, compute);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{22 * i, 22 * i}, createBlockingInputInfos2, compute);
        Map compute2 = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 1, 1, 1, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute2, arrayList, 1, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{24 * i}, createBlockingInputInfos, compute2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{24 * i}, createBlockingInputInfos2, compute2);
        Map compute3 = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 1, 1, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute3, arrayList, 4, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12 * i, 10 * i, 10 * i, 12 * i}, createBlockingInputInfos, compute3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12 * i, 10 * i, 10 * i, 12 * i}, createBlockingInputInfos2, compute3);
        Map compute4 = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 5, 5, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute4, arrayList, 5, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{2 * i, 10 * i, 10 * i, 10 * i, 12 * i}, createBlockingInputInfos, compute4);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{2 * i, 10 * i, 10 * i, 10 * i, 12 * i}, createBlockingInputInfos2, compute4);
    }

    @Test
    void testComputeSkewedInputsWithDifferentNumPartitions() {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.addAll(createBlockingInputInfos(1, 1, 2, false, true, List.of(1)));
        arrayList2.addAll(createBlockingInputInfos(1, 1, 3, false, true, List.of(1)));
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(2, 1, 2, false, true, List.of(1));
        arrayList.addAll(arrayList2);
        arrayList.addAll(createBlockingInputInfos);
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 2, 1, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForCorrelatedInputs((Map<IntermediateDataSetID, JobVertexInputInfo>) compute, arrayList, 2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{55, 55}, arrayList2, compute);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{12, 12}, createBlockingInputInfos, compute);
    }

    @Test
    void testComputeRebalancedWithAndWithoutCorrelations() {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, 1, 10, true, false, List.of());
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), createBlockingInputInfos, 2, 1, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfos.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{20, 10}, createBlockingInputInfos, compute);
        List<BlockingInputInfo> createBlockingInputInfos2 = createBlockingInputInfos(1, 1, 10, false, false, List.of());
        Map compute2 = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), createBlockingInputInfos2, 2, 1, 5, 1L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute2, createBlockingInputInfos2.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{15, 15}, createBlockingInputInfos2, compute2);
        Map compute3 = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), createBlockingInputInfos2, 3, 1, 5, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute3, createBlockingInputInfos2.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10, 10, 10}, createBlockingInputInfos2, compute3);
    }

    @Test
    void testComputeInputsWithDifferentCorrelations() {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, 1, 10, false, false, List.of());
        List<BlockingInputInfo> createBlockingInputInfos2 = createBlockingInputInfos(1, 1, 10, true, true, List.of());
        arrayList.addAll(createBlockingInputInfos);
        arrayList.addAll(createBlockingInputInfos2);
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), arrayList, 10, 1, 10, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfos2.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10, 10, 10}, createBlockingInputInfos2, compute);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfos.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10, 10, 10}, createBlockingInputInfos, compute);
    }

    @Test
    void testComputeWithLargeDataVolumePerTask() {
        AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer = createAllToAllVertexInputInfoComputer();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(1, 1, 10, true, true, List.of());
        Map compute = createAllToAllVertexInputInfoComputer.compute(new JobVertexID(), createBlockingInputInfos, 10, 1, 10, 100L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfos.get(0), 1);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{30}, createBlockingInputInfos, compute);
    }

    public static List<BlockingInputInfo> createBlockingInputInfos(int i, int i2, int i3, boolean z, boolean z2, List<Integer> list) {
        return VertexInputInfoComputerTestUtil.createBlockingInputInfos(i, i2, i3, 3, z, z2, 1, 10.0d, List.of(), list, false);
    }

    private static AllToAllVertexInputInfoComputer createAllToAllVertexInputInfoComputer() {
        return new AllToAllVertexInputInfoComputer(4.0d, 10L);
    }
}
