package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer;
import org.apache.flink.runtime.scheduler.adaptivebatch.util.PointwiseVertexInputInfoComputer;
import org.apache.flink.runtime.scheduler.adaptivebatch.util.VertexParallelismAndInputInfosDeciderUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.class */
public class DefaultVertexParallelismAndInputInfosDecider implements VertexParallelismAndInputInfosDecider {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultVertexParallelismAndInputInfosDecider.class);
    private final int globalMaxParallelism;
    private final int globalMinParallelism;
    private final long dataVolumePerTask;
    private final int globalDefaultSourceParallelism;
    private final AllToAllVertexInputInfoComputer allToAllVertexInputInfoComputer;
    private final PointwiseVertexInputInfoComputer pointwiseVertexInputInfoComputer;

    private DefaultVertexParallelismAndInputInfosDecider(int i, int i2, MemorySize memorySize, int i3, double d, long j) {
        Preconditions.checkArgument(i2 > 0, "The minimum parallelism must be larger than 0.");
        Preconditions.checkArgument(i >= i2, "Maximum parallelism should be greater than or equal to the minimum parallelism.");
        Preconditions.checkArgument(i3 > 0, "The default source parallelism must be larger than 0.");
        Preconditions.checkNotNull(memorySize);
        Preconditions.checkArgument(d > 0.0d, "The default skewed partition factor must be larger than 0.");
        Preconditions.checkArgument(j > 0, "The default skewed threshold must be larger than 0.");
        this.globalMaxParallelism = i;
        this.globalMinParallelism = i2;
        this.dataVolumePerTask = memorySize.getBytes();
        this.globalDefaultSourceParallelism = i3;
        this.allToAllVertexInputInfoComputer = new AllToAllVertexInputInfoComputer(d, j);
        this.pointwiseVertexInputInfoComputer = new PointwiseVertexInputInfoComputer();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider
    public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(JobVertexID jobVertexID, List<BlockingInputInfo> list, int i, int i2, int i3) {
        Preconditions.checkArgument(i == -1 || i > 0);
        Preconditions.checkArgument(i2 == -1 || i2 > 0);
        Preconditions.checkArgument(i3 > 0 && i3 >= i && i3 >= i2);
        if (list.isEmpty()) {
            return new ParallelismAndInputInfos(i > 0 ? i : computeSourceParallelismUpperBound(jobVertexID, i3), Collections.emptyMap());
        }
        int max = Math.max(this.globalMinParallelism, i2);
        int i4 = this.globalMaxParallelism;
        if (i == -1 && i3 < max) {
            LOG.info("The vertex maximum parallelism {} is smaller than the minimum parallelism {}. Use {} as the lower bound to decide parallelism of job vertex {}.", new Object[]{Integer.valueOf(i3), Integer.valueOf(max), Integer.valueOf(i3), jobVertexID});
            max = i3;
        }
        if (i == -1 && i3 < i4) {
            LOG.info("The vertex maximum parallelism {} is smaller than the global maximum parallelism {}. Use {} as the upper bound to decide parallelism of job vertex {}.", new Object[]{Integer.valueOf(i3), Integer.valueOf(i4), Integer.valueOf(i3), jobVertexID});
            i4 = i3;
        }
        Preconditions.checkState(i4 >= max);
        return decideParallelismAndInputInfosForNonSource(jobVertexID, list, i, max, i4);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider
    public int computeSourceParallelismUpperBound(JobVertexID jobVertexID, int i) {
        if (this.globalDefaultSourceParallelism <= i) {
            return this.globalDefaultSourceParallelism;
        }
        LOG.info("The global default source parallelism {} is larger than the maximum parallelism {}. Use {} as the upper bound parallelism of source job vertex {}.", new Object[]{Integer.valueOf(this.globalDefaultSourceParallelism), Integer.valueOf(i), Integer.valueOf(i), jobVertexID});
        return i;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider
    public long getDataVolumePerTask() {
        return this.dataVolumePerTask;
    }

    private ParallelismAndInputInfos decideParallelismAndInputInfosForNonSource(JobVertexID jobVertexID, List<BlockingInputInfo> list, int i, int i2, int i3) {
        int decideParallelism = i > 0 ? i : decideParallelism(jobVertexID, list, i2, i3);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        list.forEach(blockingInputInfo -> {
            if (blockingInputInfo.isPointwise()) {
                arrayList.add(blockingInputInfo);
            } else {
                arrayList2.add(blockingInputInfo);
            }
        });
        if (i > 0 || !arrayList.isEmpty()) {
            i2 = decideParallelism;
            i3 = decideParallelism;
        }
        HashMap hashMap = new HashMap();
        if (!arrayList2.isEmpty()) {
            hashMap.putAll(this.allToAllVertexInputInfoComputer.compute(jobVertexID, arrayList2, decideParallelism, i2, i3, VertexParallelismAndInputInfosDeciderUtils.calculateDataVolumePerTaskForInputsGroup(this.dataVolumePerTask, arrayList2, list)));
        }
        if (!arrayList.isEmpty()) {
            hashMap.putAll(this.pointwiseVertexInputInfoComputer.compute(arrayList, decideParallelism, VertexParallelismAndInputInfosDeciderUtils.calculateDataVolumePerTaskForInputsGroup(this.dataVolumePerTask, arrayList, list)));
        }
        return new ParallelismAndInputInfos(VertexParallelismAndInputInfosDeciderUtils.checkAndGetParallelism(hashMap.values()), hashMap);
    }

    int decideParallelism(JobVertexID jobVertexID, List<BlockingInputInfo> list, int i, int i2) {
        Preconditions.checkArgument(!list.isEmpty());
        List<BlockingInputInfo> nonBroadcastInputInfos = VertexParallelismAndInputInfosDeciderUtils.getNonBroadcastInputInfos(list);
        if (nonBroadcastInputInfos.isEmpty()) {
            return i;
        }
        long sum = nonBroadcastInputInfos.stream().mapToLong((v0) -> {
            return v0.getNumBytesProduced();
        }).sum();
        int ceil = (int) Math.ceil(sum / this.dataVolumePerTask);
        LOG.debug("The total size of non-broadcast data is {}, the initially decided parallelism of job vertex {} is {}.", new Object[]{new MemorySize(sum), jobVertexID, Integer.valueOf(ceil)});
        if (ceil < i) {
            LOG.info("The initially decided parallelism {} is smaller than the minimum parallelism {}. Use {} as the finally decided parallelism of job vertex {}.", new Object[]{Integer.valueOf(ceil), Integer.valueOf(i), Integer.valueOf(i), jobVertexID});
            ceil = i;
        } else if (ceil > i2) {
            LOG.info("The initially decided parallelism {} is larger than the maximum parallelism {}. Use {} as the finally decided parallelism of job vertex {}.", new Object[]{Integer.valueOf(ceil), Integer.valueOf(i2), Integer.valueOf(i2), jobVertexID});
            ceil = i2;
        }
        return ceil;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DefaultVertexParallelismAndInputInfosDecider from(int i, double d, long j, Configuration configuration) {
        return new DefaultVertexParallelismAndInputInfosDecider(i, ((Integer) configuration.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM)).intValue(), (MemorySize) configuration.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK), ((Integer) configuration.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, Integer.valueOf(i))).intValue(), d, j);
    }
}
