package org.apache.flink.connector.base.sink.writer;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest.class */
public class AsyncSinkWriterTimeoutTest {
    private final ExecutorService executorService = Executors.newFixedThreadPool(5);
    private final List<Long> destination = new ArrayList();

    /* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest$TimeoutWriter.class */
    private class TimeoutWriter extends AsyncSinkWriter<String, Long> {
        private Exception fatalError;
        private final CountDownLatch completionLatch;
        private Future<?> submitFuture;
        private boolean shouldFailRequest;

        public TimeoutWriter(WriterInitContext writerInitContext, int i, long j, long j2, boolean z) {
            super((str, context) -> {
                return Long.valueOf(Long.parseLong(str));
            }, writerInitContext, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(i).setMaxBatchSizeInBytes(Long.MAX_VALUE).setMaxInFlightRequests(Integer.MAX_VALUE).setMaxBufferedRequests(Integer.MAX_VALUE).setMaxTimeInBufferMS(j).setMaxRecordSizeInBytes(Long.MAX_VALUE).setRequestTimeoutMS(j2).setFailOnTimeout(z).build(), Collections.emptyList());
            this.shouldFailRequest = false;
            this.completionLatch = new CountDownLatch(1);
        }

        protected void submitRequestEntries(List<Long> list, ResultHandler<Long> resultHandler) {
            this.submitFuture = AsyncSinkWriterTimeoutTest.this.executorService.submit(() -> {
                while (this.completionLatch.getCount() > 0) {
                    try {
                        this.completionLatch.await();
                    } catch (InterruptedException e) {
                        Assertions.fail("Submission thread must not be interrupted.");
                    }
                }
                submitRequestEntriesSync(list, resultHandler);
            });
        }

        private void submitRequestEntriesSync(List<Long> list, ResultHandler<Long> resultHandler) {
            if (this.fatalError != null) {
                resultHandler.completeExceptionally(this.fatalError);
            } else if (this.shouldFailRequest) {
                this.shouldFailRequest = false;
                resultHandler.retryForEntries(list);
            } else {
                AsyncSinkWriterTimeoutTest.this.destination.addAll(list);
                resultHandler.complete();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public long getSizeInBytes(Long l) {
            return 8L;
        }

        public void setFatalError(Exception exc) {
            this.fatalError = exc;
        }

        public void setShouldFailRequest(boolean z) {
            this.shouldFailRequest = z;
        }

        public void deliverMessage() throws InterruptedException, ExecutionException {
            this.completionLatch.countDown();
            this.submitFuture.get();
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1943254906:
                    if (implMethodName.equals("lambda$new$896ba39f$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/base/sink/writer/ElementConverter") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/api/connector/sink2/SinkWriter$Context;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest$TimeoutWriter") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/api/connector/sink2/SinkWriter$Context;)Ljava/lang/Long;")) {
                        return (str, context) -> {
                            return Long.valueOf(Long.parseLong(str));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    AsyncSinkWriterTimeoutTest() {
    }

    @Test
    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws Exception {
        TestSinkInitContextAnyThreadMailbox testSinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService testProcessingTimeService = testSinkInitContextAnyThreadMailbox.getTestProcessingTimeService();
        TimeoutWriter timeoutWriter = new TimeoutWriter(testSinkInitContextAnyThreadMailbox, 1, 10L, 100L, false);
        testProcessingTimeService.setCurrentTime(0L);
        timeoutWriter.write("1", null);
        testProcessingTimeService.setCurrentTime(10L);
        timeoutWriter.deliverMessage();
        testProcessingTimeService.setCurrentTime(120L);
        timeoutWriter.flush(false);
        org.assertj.core.api.Assertions.assertThat(this.destination).containsExactly(new Long[]{1L});
    }

    @Test
    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
        TestSinkInitContextAnyThreadMailbox testSinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService testProcessingTimeService = testSinkInitContextAnyThreadMailbox.getTestProcessingTimeService();
        TimeoutWriter timeoutWriter = new TimeoutWriter(testSinkInitContextAnyThreadMailbox, 1, 10L, 100L, false);
        testProcessingTimeService.setCurrentTime(0L);
        timeoutWriter.write("1", null);
        testProcessingTimeService.setCurrentTime(110L);
        timeoutWriter.deliverMessage();
        timeoutWriter.flush(false);
        org.assertj.core.api.Assertions.assertThat(this.destination).containsExactly(new Long[]{1L, 1L});
    }

    @Test
    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
        TestSinkInitContextAnyThreadMailbox testSinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService testProcessingTimeService = testSinkInitContextAnyThreadMailbox.getTestProcessingTimeService();
        TimeoutWriter timeoutWriter = new TimeoutWriter(testSinkInitContextAnyThreadMailbox, 1, 10L, 100L, true);
        testProcessingTimeService.setCurrentTime(0L);
        timeoutWriter.write("1", null);
        testProcessingTimeService.setCurrentTime(110L);
        timeoutWriter.deliverMessage();
        org.assertj.core.api.Assertions.assertThatExceptionOfType(FlinkRuntimeException.class).isThrownBy(() -> {
            timeoutWriter.flush(false);
        }).withCauseInstanceOf(TimeoutException.class).havingCause().withMessageContaining("Request timed out after 100ms with failOnTimeout set to true.");
    }

    @Test
    void writerShouldDiscardRetriedEntriesOnTimeout() throws Exception {
        TestSinkInitContextAnyThreadMailbox testSinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService testProcessingTimeService = testSinkInitContextAnyThreadMailbox.getTestProcessingTimeService();
        TimeoutWriter timeoutWriter = new TimeoutWriter(testSinkInitContextAnyThreadMailbox, 1, 10L, 100L, false);
        timeoutWriter.setShouldFailRequest(true);
        testProcessingTimeService.setCurrentTime(0L);
        timeoutWriter.write("1", null);
        testProcessingTimeService.setCurrentTime(110L);
        timeoutWriter.deliverMessage();
        timeoutWriter.flush(false);
        org.assertj.core.api.Assertions.assertThat(this.destination).containsExactly(new Long[]{1L});
    }

    @Test
    void writerShouldFailOnFatalError() throws Exception {
        TestSinkInitContextAnyThreadMailbox testSinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService testProcessingTimeService = testSinkInitContextAnyThreadMailbox.getTestProcessingTimeService();
        TimeoutWriter timeoutWriter = new TimeoutWriter(testSinkInitContextAnyThreadMailbox, 1, 10L, 100L, true);
        testProcessingTimeService.setCurrentTime(0L);
        timeoutWriter.setFatalError(new FlinkRuntimeException("Fatal error"));
        timeoutWriter.write("1", null);
        timeoutWriter.deliverMessage();
        org.assertj.core.api.Assertions.assertThatExceptionOfType(FlinkRuntimeException.class).isThrownBy(() -> {
            timeoutWriter.flush(false);
        }).withMessage("Fatal error");
    }
}
