/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.source.lib.util;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class RateLimitedSourceReaderITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    @RegisterExtension
    private static final MiniClusterExtension miniClusterExtension = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Test
    @DisplayName(value="Rate limiter is used correctly.")
    public void testRateLimitingParallelExecution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        int count = 10;
        MockRateLimiterStrategy rateLimiterStrategy = new MockRateLimiterStrategy();
        DataGeneratorSource dataGeneratorSource = new DataGeneratorSource((GeneratorFunction & Serializable)index -> index, 10L, (RateLimiterStrategy)rateLimiterStrategy, Types.LONG);
        DataStreamSource stream = env.fromSource((Source)dataGeneratorSource, WatermarkStrategy.noWatermarks(), "generator source");
        List result = stream.executeAndCollect(10000);
        int rateLimiterCallCount = MockRateLimiterStrategy.getRateLimitersCallCount();
        Assertions.assertThat((List)result).containsExactlyInAnyOrderElementsOf(this.range(0, 9));
        Assertions.assertThat((int)rateLimiterCallCount).isGreaterThan(10);
    }

    private List<Long> range(int startInclusive, int endInclusive) {
        return LongStream.rangeClosed(startInclusive, endInclusive).boxed().collect(Collectors.toList());
    }

    private static class MockRateLimiterStrategy
    implements RateLimiterStrategy {
        private static final List<MockRateLimiter> rateLimiters = Collections.synchronizedList(new ArrayList());

        private MockRateLimiterStrategy() {
        }

        public RateLimiter createRateLimiter(int parallelism) {
            MockRateLimiter mockRateLimiter = new MockRateLimiter();
            rateLimiters.add(mockRateLimiter);
            return mockRateLimiter;
        }

        public static int getRateLimitersCallCount() {
            return rateLimiters.stream().mapToInt(MockRateLimiter::getCallCount).sum();
        }
    }

    private static final class MockRateLimiter
    implements RateLimiter {
        int callCount;

        private MockRateLimiter() {
        }

        public CompletableFuture<Void> acquire() {
            ++this.callCount;
            return CompletableFuture.completedFuture(null);
        }

        public int getCallCount() {
            return this.callCount;
        }
    }
}

