/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer.strategy;

import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.BasicRequestInfo;
import org.apache.flink.connector.base.sink.writer.strategy.BasicResultInfo;
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
import org.apache.flink.connector.base.sink.writer.strategy.ResultInfo;
import org.apache.flink.connector.base.sink.writer.strategy.ScalingStrategy;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CongestionControlRateLimitingStrategyTest {
    CongestionControlRateLimitingStrategyTest() {
    }

    @Test
    void testMaxInFlightRequestsRespected() {
        int maxInFlightRequests = 2;
        CongestionControlRateLimitingStrategy strategy = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(2).setInitialMaxInFlightMessages(10).setScalingStrategy((ScalingStrategy)AIMDScalingStrategy.builder((int)10).build()).build();
        BasicRequestInfo emptyRequest = new BasicRequestInfo(0);
        BasicResultInfo emptyResult = new BasicResultInfo(0, 0);
        for (int i = 0; i < 2; ++i) {
            Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)emptyRequest)).isFalse();
            strategy.registerInFlightRequest((RequestInfo)emptyRequest);
        }
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)emptyRequest)).isTrue();
        strategy.registerCompletedRequest((ResultInfo)emptyResult);
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)emptyRequest)).isFalse();
    }

    @Test
    void testMaxInFlightRequestsDoesNotGoBelowZero() {
        boolean maxInFlightRequests = true;
        CongestionControlRateLimitingStrategy strategy = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(1).setInitialMaxInFlightMessages(10).setScalingStrategy((ScalingStrategy)AIMDScalingStrategy.builder((int)10).build()).build();
        BasicRequestInfo emptyRequest = new BasicRequestInfo(0);
        BasicResultInfo emptyResult = new BasicResultInfo(0, 0);
        strategy.registerCompletedRequest((ResultInfo)emptyResult);
        for (int i = 0; i < 1; ++i) {
            Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)emptyRequest)).isFalse();
            strategy.registerInFlightRequest((RequestInfo)emptyRequest);
        }
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)emptyRequest)).isTrue();
        strategy.registerCompletedRequest((ResultInfo)emptyResult);
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)emptyRequest)).isFalse();
    }

    @Test
    void testInitialMaxInFlightMessagesRespected() {
        BasicRequestInfo requestWithTwoMessages = new BasicRequestInfo(2);
        BasicResultInfo resultWithTwoMessages = new BasicResultInfo(0, 2);
        CongestionControlRateLimitingStrategy strategy = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(100).setInitialMaxInFlightMessages(4).setScalingStrategy((ScalingStrategy)AIMDScalingStrategy.builder((int)4).setIncreaseRate(1).build()).build();
        strategy.registerInFlightRequest((RequestInfo)requestWithTwoMessages);
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithTwoMessages)).isFalse();
        strategy.registerInFlightRequest((RequestInfo)requestWithTwoMessages);
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithTwoMessages)).isTrue();
        strategy.registerCompletedRequest((ResultInfo)resultWithTwoMessages);
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithTwoMessages)).isFalse();
    }

    @Test
    void testAimdScalingStrategyScaleUpOnSuccess() {
        BasicRequestInfo emptyRequest = new BasicRequestInfo(0);
        BasicResultInfo emptySuccessfulResult = new BasicResultInfo(0, 0);
        BasicRequestInfo requestWithTwoMessages = new BasicRequestInfo(2);
        AIMDScalingStrategy aimdScalingStrategy = AIMDScalingStrategy.builder((int)100).setIncreaseRate(10).setDecreaseFactor(0.5).build();
        CongestionControlRateLimitingStrategy strategy = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(100).setInitialMaxInFlightMessages(1).setScalingStrategy((ScalingStrategy)aimdScalingStrategy).build();
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)emptyRequest)).isFalse();
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithTwoMessages)).isTrue();
        strategy.registerInFlightRequest((RequestInfo)emptyRequest);
        strategy.registerCompletedRequest((ResultInfo)emptySuccessfulResult);
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithTwoMessages)).isFalse();
    }

    @Test
    void testAimdScalingStrategyScaleDownOnFailure() {
        BasicRequestInfo requestWithOneMessage = new BasicRequestInfo(1);
        BasicResultInfo resultWithOneFailedMessage = new BasicResultInfo(1, 1);
        BasicRequestInfo requestWithTwoMessages = new BasicRequestInfo(2);
        AIMDScalingStrategy aimdScalingStrategy = AIMDScalingStrategy.builder((int)100).setDecreaseFactor(0.5).build();
        CongestionControlRateLimitingStrategy strategy = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(100).setInitialMaxInFlightMessages(2).setScalingStrategy((ScalingStrategy)aimdScalingStrategy).build();
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithOneMessage)).isFalse();
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithTwoMessages)).isFalse();
        strategy.registerInFlightRequest((RequestInfo)requestWithOneMessage);
        strategy.registerCompletedRequest((ResultInfo)resultWithOneFailedMessage);
        Assertions.assertThat((boolean)strategy.shouldBlock((RequestInfo)requestWithTwoMessages)).isTrue();
    }

    @Test
    void testInvalidMaxInFlightRequests() {
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(0).setInitialMaxInFlightMessages(10).setScalingStrategy((ScalingStrategy)AIMDScalingStrategy.builder((int)10).build()).build()).withMessageContaining("maxInFlightRequests must be a positive integer.");
    }

    @Test
    void testInvalidMaxInFlightMessages() {
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(10).setInitialMaxInFlightMessages(0).setScalingStrategy((ScalingStrategy)AIMDScalingStrategy.builder((int)10).build()).build()).withMessageContaining("initialMaxInFlightMessages must be a positive integer.");
    }

    @Test
    void testInvalidAimdStrategy() {
        Assertions.assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(10).setInitialMaxInFlightMessages(10).build()).withMessageContaining("scalingStrategy must be provided.");
    }
}

