package org.apache.flink.runtime.scheduler.adaptivebatch.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/util/PointwiseVertexInputInfoComputer.class */
public class PointwiseVertexInputInfoComputer {
    private static final Logger LOG = LoggerFactory.getLogger(PointwiseVertexInputInfoComputer.class);

    public Map<IntermediateDataSetID, JobVertexInputInfo> compute(List<BlockingInputInfo> list, int i, long j) {
        long sum = list.stream().mapToLong((v0) -> {
            return v0.getNumBytesProduced();
        }).sum();
        HashMap hashMap = new HashMap();
        for (BlockingInputInfo blockingInputInfo : list) {
            Preconditions.checkState(!blockingInputInfo.areInterInputsKeysCorrelated());
            if (blockingInputInfo.isIntraInputKeyCorrelated()) {
                Preconditions.checkState(i <= blockingInputInfo.getNumPartitions());
            }
            hashMap.put(blockingInputInfo.getResultId(), computeVertexInputInfo(blockingInputInfo, i, VertexParallelismAndInputInfosDeciderUtils.calculateDataVolumePerTaskForInput(j, blockingInputInfo.getNumBytesProduced(), sum)));
        }
        return hashMap;
    }

    private static JobVertexInputInfo computeVertexInputInfo(BlockingInputInfo blockingInputInfo, int i, long j) {
        List<SubpartitionSlice> createSubpartitionSlices = createSubpartitionSlices(blockingInputInfo);
        Optional<List<IndexRange>> tryComputeSubpartitionSliceRange = VertexParallelismAndInputInfosDeciderUtils.tryComputeSubpartitionSliceRange(i, i, j, Map.of(Integer.valueOf(blockingInputInfo.getInputTypeNumber()), createSubpartitionSlices));
        if (!tryComputeSubpartitionSliceRange.isEmpty()) {
            List<IndexRange> list = tryComputeSubpartitionSliceRange.get();
            Preconditions.checkState(VertexParallelismAndInputInfosDeciderUtils.isLegalParallelism(list.size(), i, i));
            return createJobVertexInputInfo(blockingInputInfo, list, createSubpartitionSlices);
        }
        LOG.info("Cannot find a legal parallelism to evenly distribute data amount for input {}, fallback to compute a parallelism that can evenly distribute num subpartitions.", blockingInputInfo.getResultId());
        int numPartitions = blockingInputInfo.getNumPartitions();
        Objects.requireNonNull(blockingInputInfo);
        return VertexInputInfoComputationUtils.computeVertexInputInfoForPointwise(numPartitions, i, (v1) -> {
            return r2.getNumSubpartitions(v1);
        }, true);
    }

    private static List<SubpartitionSlice> createSubpartitionSlices(BlockingInputInfo blockingInputInfo) {
        ArrayList arrayList = new ArrayList();
        if (blockingInputInfo.isIntraInputKeyCorrelated()) {
            for (int i = 0; i < blockingInputInfo.getNumPartitions(); i++) {
                IndexRange indexRange = new IndexRange(i, i);
                IndexRange indexRange2 = new IndexRange(0, blockingInputInfo.getNumSubpartitions(i) - 1);
                arrayList.add(SubpartitionSlice.createSubpartitionSlice(indexRange, indexRange2, blockingInputInfo.getNumBytesProduced(indexRange, indexRange2)));
            }
        } else {
            for (int i2 = 0; i2 < blockingInputInfo.getNumPartitions(); i2++) {
                IndexRange indexRange3 = new IndexRange(i2, i2);
                for (int i3 = 0; i3 < blockingInputInfo.getNumSubpartitions(i2); i3++) {
                    IndexRange indexRange4 = new IndexRange(i3, i3);
                    arrayList.add(SubpartitionSlice.createSubpartitionSlice(indexRange3, indexRange4, blockingInputInfo.getNumBytesProduced(indexRange3, indexRange4)));
                }
            }
        }
        return arrayList;
    }

    private static JobVertexInputInfo createJobVertexInputInfo(BlockingInputInfo blockingInputInfo, List<IndexRange> list, List<SubpartitionSlice> list2) {
        Preconditions.checkState(!blockingInputInfo.isBroadcast());
        return VertexParallelismAndInputInfosDeciderUtils.createdJobVertexInputInfoForNonBroadcast(blockingInputInfo, list, list2);
    }
}
