package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.class */
public class HsSelectiveSpillingStrategy implements HsSpillingStrategy {
    private final float spillBufferRatio;
    private final float spillThreshold;

    public HsSelectiveSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
        this.spillThreshold = hybridShuffleConfiguration.getSelectiveStrategySpillThreshold();
        this.spillBufferRatio = hybridShuffleConfiguration.getSelectiveStrategySpillBufferRatio();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public Optional<HsSpillingStrategy.Decision> onBufferFinished(int i) {
        return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public Optional<HsSpillingStrategy.Decision> onBufferConsumed(BufferIndexAndChannel bufferIndexAndChannel) {
        return Optional.of(HsSpillingStrategy.Decision.builder().addBufferToRelease(bufferIndexAndChannel).build());
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public Optional<HsSpillingStrategy.Decision> onMemoryUsageChanged(int i, int i2) {
        return ((float) i) < ((float) i2) * this.spillThreshold ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public HsSpillingStrategy.Decision decideActionWithGlobalInfo(HsSpillingInfoProvider hsSpillingInfoProvider) {
        if (hsSpillingInfoProvider.getNumTotalRequestedBuffers() < hsSpillingInfoProvider.getPoolSize() * this.spillThreshold) {
            return HsSpillingStrategy.Decision.NO_ACTION;
        }
        int poolSize = (int) (hsSpillingInfoProvider.getPoolSize() * this.spillBufferRatio);
        TreeMap treeMap = new TreeMap();
        for (int i = 0; i < hsSpillingInfoProvider.getNumSubpartitions(); i++) {
            treeMap.put(Integer.valueOf(i), hsSpillingInfoProvider.getBuffersInOrder(i, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED));
        }
        TreeMap<Integer, List<BufferIndexAndChannel>> buffersByConsumptionPriorityInOrder = HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder(hsSpillingInfoProvider.getNextBufferIndexToConsume(), treeMap, poolSize);
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        buffersByConsumptionPriorityInOrder.forEach((num, list) -> {
            builder.addBufferToSpill(num.intValue(), (List<BufferIndexAndChannel>) list);
            builder.addBufferToRelease(num.intValue(), list);
        });
        return builder.build();
    }
}
