package org.apache.flink.api.connector.source.lib.util;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.class */
public class RateLimitedSourceReaderITCase extends TestLogger {
    private static final int PARALLELISM = 4;

    @RegisterExtension
    private static final MiniClusterExtension miniClusterExtension = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase$MockRateLimiter.class */
    public static final class MockRateLimiter implements RateLimiter {
        int callCount;

        private MockRateLimiter() {
        }

        /* renamed from: acquire, reason: merged with bridge method [inline-methods] */
        public CompletableFuture<Void> m2acquire() {
            this.callCount++;
            return CompletableFuture.completedFuture(null);
        }

        public int getCallCount() {
            return this.callCount;
        }
    }

    /* loaded from: input_file:org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase$MockRateLimiterStrategy.class */
    private static class MockRateLimiterStrategy implements RateLimiterStrategy {
        private static final List<MockRateLimiter> rateLimiters = Collections.synchronizedList(new ArrayList());

        private MockRateLimiterStrategy() {
        }

        public RateLimiter createRateLimiter(int i) {
            MockRateLimiter mockRateLimiter = new MockRateLimiter();
            rateLimiters.add(mockRateLimiter);
            return mockRateLimiter;
        }

        public static int getRateLimitersCallCount() {
            return rateLimiters.stream().mapToInt((v0) -> {
                return v0.getCallCount();
            }).sum();
        }
    }

    @DisplayName("Rate limiter is used correctly.")
    @Test
    public void testRateLimitingParallelExecution() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        List executeAndCollect = executionEnvironment.fromSource(new DataGeneratorSource(l -> {
            return l;
        }, 10L, new MockRateLimiterStrategy(), Types.LONG), WatermarkStrategy.noWatermarks(), "generator source").executeAndCollect(10000);
        int rateLimitersCallCount = MockRateLimiterStrategy.getRateLimitersCallCount();
        Assertions.assertThat(executeAndCollect).containsExactlyInAnyOrderElementsOf(range(0, 9));
        Assertions.assertThat(rateLimitersCallCount).isGreaterThan(10);
    }

    private List<Long> range(int i, int i2) {
        return (List) LongStream.rangeClosed(i, i2).boxed().collect(Collectors.toList());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1806371872:
                if (implMethodName.equals("lambda$testRateLimitingParallelExecution$b5586b19$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/datagen/source/GeneratorFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
