package org.apache.flink.runtime.scheduler.adaptivebatch.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexInputInfoComputerTestUtil;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/util/PointwiseVertexInputInfoComputerTest.class */
class PointwiseVertexInputInfoComputerTest {
    PointwiseVertexInputInfoComputerTest() {
    }

    @Test
    void testComputeNormalInput() {
        PointwiseVertexInputInfoComputer createPointwiseVertexInputInfoComputer = createPointwiseVertexInputInfoComputer();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(2, List.of(), false);
        Map compute = createPointwiseVertexInputInfoComputer.compute(createBlockingInputInfos, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfos.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{3, 3}, createBlockingInputInfos, compute);
        Map compute2 = createPointwiseVertexInputInfoComputer.compute(createBlockingInputInfos, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute2, createBlockingInputInfos.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{2, 2, 2}, createBlockingInputInfos, compute2);
    }

    @Test
    void testComputeSkewedInputsWithDifferentSkewedPartitions() {
        PointwiseVertexInputInfoComputer createPointwiseVertexInputInfoComputer = createPointwiseVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        BlockingInputInfo createBlockingInputInfo = createBlockingInputInfo(3, 3, List.of(0), false);
        BlockingInputInfo createBlockingInputInfo2 = createBlockingInputInfo(3, 3, List.of(1), false);
        arrayList.add(createBlockingInputInfo);
        arrayList.add(createBlockingInputInfo2);
        Map compute = createPointwiseVertexInputInfoComputer.compute(arrayList, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfo, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{10, 10, 16}, List.of(createBlockingInputInfo), compute);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfo2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{13, 10, 13}, List.of(createBlockingInputInfo2), compute);
    }

    @Test
    void testComputeSkewedInputsWithDifferentNumPartitions() {
        PointwiseVertexInputInfoComputer createPointwiseVertexInputInfoComputer = createPointwiseVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        BlockingInputInfo createBlockingInputInfo = createBlockingInputInfo(3, 3, List.of(1), false);
        BlockingInputInfo createBlockingInputInfo2 = createBlockingInputInfo(2, 3, List.of(1), false);
        arrayList.add(createBlockingInputInfo);
        arrayList.add(createBlockingInputInfo2);
        Map compute = createPointwiseVertexInputInfoComputer.compute(arrayList, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfo, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{13, 10, 13}, List.of(createBlockingInputInfo), compute);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfo2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{13, 10, 10}, List.of(createBlockingInputInfo2), compute);
    }

    @Test
    void testComputeSkewedInputsWithDifferentNumSubpartitions() {
        PointwiseVertexInputInfoComputer createPointwiseVertexInputInfoComputer = createPointwiseVertexInputInfoComputer();
        ArrayList arrayList = new ArrayList();
        BlockingInputInfo createBlockingInputInfo = createBlockingInputInfo(3, 3, List.of(1), false);
        BlockingInputInfo createBlockingInputInfo2 = createBlockingInputInfo(3, 5, List.of(1), false);
        arrayList.add(createBlockingInputInfo);
        arrayList.add(createBlockingInputInfo2);
        Map compute = createPointwiseVertexInputInfoComputer.compute(arrayList, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfo, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{13, 10, 13}, List.of(createBlockingInputInfo), compute);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfo2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{25, 20, 15}, List.of(createBlockingInputInfo2), compute);
    }

    @Test
    void testComputeInputWithIntraCorrelation() {
        PointwiseVertexInputInfoComputer createPointwiseVertexInputInfoComputer = createPointwiseVertexInputInfoComputer();
        List<BlockingInputInfo> createBlockingInputInfos = createBlockingInputInfos(3, List.of(), true);
        Map compute = createPointwiseVertexInputInfoComputer.compute(createBlockingInputInfos, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute, createBlockingInputInfos.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(List.of(Map.of(new IndexRange(0, 0), new IndexRange(0, 2)), Map.of(new IndexRange(1, 1), new IndexRange(0, 2)), Map.of(new IndexRange(2, 2), new IndexRange(0, 2))), createBlockingInputInfos, compute);
        Map compute2 = createPointwiseVertexInputInfoComputer.compute(createBlockingInputInfos, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(compute2, createBlockingInputInfos.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(List.of(Map.of(new IndexRange(0, 1), new IndexRange(0, 2)), Map.of(new IndexRange(2, 2), new IndexRange(0, 2))), createBlockingInputInfos, compute2);
    }

    private static List<BlockingInputInfo> createBlockingInputInfos(int i, List<Integer> list, boolean z) {
        return List.of(createBlockingInputInfo(i, 3, list, z));
    }

    private static BlockingInputInfo createBlockingInputInfo(int i, int i2, List<Integer> list, boolean z) {
        return VertexInputInfoComputerTestUtil.createBlockingInputInfos(1, 1, i, i2, z, false, 1, 10.0d, list, List.of(), true).get(0);
    }

    private static PointwiseVertexInputInfoComputer createPointwiseVertexInputInfoComputer() {
        return new PointwiseVertexInputInfoComputer();
    }
}
