package org.apache.flink.connector.base.sink.writer.strategy;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy.class */
public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
    private final int maxInFlightRequests;
    private final ScalingStrategy<Integer> scalingStrategy;
    private int maxInFlightMessages;
    private int currentInFlightRequests;
    private int currentInFlightMessages;

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy$CongestionControlRateLimitingStrategyBuilder.class */
    public static class CongestionControlRateLimitingStrategyBuilder {
        private int maxInFlightRequests;
        private int initialMaxInFlightMessages;
        private ScalingStrategy<Integer> scalingStrategy;

        public CongestionControlRateLimitingStrategyBuilder setMaxInFlightRequests(int i) {
            this.maxInFlightRequests = i;
            return this;
        }

        public CongestionControlRateLimitingStrategyBuilder setInitialMaxInFlightMessages(int i) {
            this.initialMaxInFlightMessages = i;
            return this;
        }

        public CongestionControlRateLimitingStrategyBuilder setScalingStrategy(ScalingStrategy<Integer> scalingStrategy) {
            this.scalingStrategy = scalingStrategy;
            return this;
        }

        public CongestionControlRateLimitingStrategy build() {
            return new CongestionControlRateLimitingStrategy(this.maxInFlightRequests, this.initialMaxInFlightMessages, this.scalingStrategy);
        }
    }

    private CongestionControlRateLimitingStrategy(int i, int i2, ScalingStrategy<Integer> scalingStrategy) {
        Preconditions.checkArgument(i > 0, "maxInFlightRequests must be a positive integer.");
        Preconditions.checkArgument(i2 > 0, "initialMaxInFlightMessages must be a positive integer.");
        Preconditions.checkNotNull(scalingStrategy, "scalingStrategy must be provided.");
        this.maxInFlightRequests = i;
        this.maxInFlightMessages = i2;
        this.scalingStrategy = scalingStrategy;
    }

    @Override // org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy
    public void registerInFlightRequest(RequestInfo requestInfo) {
        this.currentInFlightRequests++;
        this.currentInFlightMessages += requestInfo.getBatchSize();
    }

    @Override // org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy
    public void registerCompletedRequest(ResultInfo resultInfo) {
        this.currentInFlightRequests = Math.max(0, this.currentInFlightRequests - 1);
        this.currentInFlightMessages -= resultInfo.getBatchSize();
        if (resultInfo.getFailedMessages() > 0) {
            this.maxInFlightMessages = this.scalingStrategy.scaleDown(Integer.valueOf(this.maxInFlightMessages)).intValue();
        } else {
            this.maxInFlightMessages = this.scalingStrategy.scaleUp(Integer.valueOf(this.maxInFlightMessages)).intValue();
        }
    }

    @Override // org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy
    public boolean shouldBlock(RequestInfo requestInfo) {
        return this.currentInFlightRequests >= this.maxInFlightRequests || this.currentInFlightMessages + requestInfo.getBatchSize() > this.maxInFlightMessages;
    }

    @Override // org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy
    public int getMaxBatchSize() {
        return this.maxInFlightMessages;
    }

    @PublicEvolving
    public static CongestionControlRateLimitingStrategyBuilder builder() {
        return new CongestionControlRateLimitingStrategyBuilder();
    }
}
