package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.function.Consumer;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.class */
public class AsyncSinkWriterThrottlingTest {

    /* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest$ThrottlingWriter.class */
    private static class ThrottlingWriter extends AsyncSinkWriter<String, Long> {
        private final ProcessingTimeService timeService;
        private final int maxBatchSize;
        private final Queue<Tuple2<Long, Integer>> requestsData;
        private long sizeOfLast100ms;
        private int inflightMessagesLimit;

        public ThrottlingWriter(ElementConverter<String, Long> elementConverter, Sink.InitContext initContext, int i, int i2) {
            super(elementConverter, initContext, i, i2, 10000, 10000L, 100L, 1000L);
            this.maxBatchSize = i;
            this.timeService = initContext.getProcessingTimeService();
            this.requestsData = new ArrayDeque();
            this.inflightMessagesLimit = i;
            this.sizeOfLast100ms = 0L;
        }

        public void write(String str) throws IOException, InterruptedException {
            super.write(str, (SinkWriter.Context) null);
        }

        public int getInflightMessagesLimit() {
            return this.inflightMessagesLimit;
        }

        protected void submitRequestEntries(List<Long> list, Consumer<List<Long>> consumer) {
            long currentProcessingTime = this.timeService.getCurrentProcessingTime();
            this.inflightMessagesLimit = list.size();
            addRequestDataToQueue(list.size(), currentProcessingTime);
            if (this.sizeOfLast100ms <= this.maxBatchSize || list.size() <= 1) {
                consumer.accept(new ArrayList());
            } else {
                consumer.accept(list);
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public long getSizeInBytes(Long l) {
            return 8L;
        }

        private void addRequestDataToQueue(int i, long j) {
            this.requestsData.add(Tuple2.of(Long.valueOf(j), Integer.valueOf(i)));
            this.sizeOfLast100ms += i;
            while (!this.requestsData.isEmpty() && ((Long) this.requestsData.peek().f0).longValue() < j - 100) {
                this.sizeOfLast100ms -= ((Integer) this.requestsData.remove().f1).intValue();
            }
        }
    }

    @Test
    public void testSinkThroughputShouldThrottleToHalfBatchSize() throws Exception {
        ThrottlingWriter throttlingWriter = new ThrottlingWriter((str, context) -> {
            return Long.valueOf(str);
        }, new TestSinkInitContextAnyThreadMailbox(), 32, 10);
        for (int i = 0; i < 1000 * 32; i++) {
            throttlingWriter.write(String.valueOf(i));
        }
        Assertions.assertThat(throttlingWriter.getInflightMessagesLimit()).isGreaterThanOrEqualTo(32 / 4);
        Assertions.assertThat(throttlingWriter.getInflightMessagesLimit()).isLessThanOrEqualTo((32 / 2) + 10);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1768723033:
                if (implMethodName.equals("lambda$testSinkThroughputShouldThrottleToHalfBatchSize$79b7b30b$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/base/sink/writer/ElementConverter") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/api/connector/sink2/SinkWriter$Context;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/api/connector/sink2/SinkWriter$Context;)Ljava/lang/Long;")) {
                    return (str, context) -> {
                        return Long.valueOf(str);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
