/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils;
import org.apache.flink.connector.base.sink.writer.BatchCreator;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.DequeRequestBuffer;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestBuffer;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.SimpleBatchCreator;
import org.apache.flink.connector.base.sink.writer.TestElementConverter;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.ScalingStrategy;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class AsyncSinkWriterTest {
    private final List<Integer> res = new ArrayList<Integer>();
    private TestSinkInitContext sinkInitContext;
    private TestSinkInitContextAnyThreadMailbox sinkInitContextAnyThreadMailbox;

    AsyncSinkWriterTest() {
    }

    @BeforeEach
    void before() {
        this.res.clear();
        this.sinkInitContext = new TestSinkInitContext();
        this.sinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
    }

    private void performNormalWriteOfEightyRecordsToMock() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        for (int i = 0; i < 80; ++i) {
            sink.write(String.valueOf(i));
        }
    }

    @Test
    void testElementConvertOpenIsInvoked() {
        TestElementConverter elementConverter = new TestElementConverter();
        Assertions.assertThat((int)elementConverter.getOpenCallCount()).isEqualTo(0);
        new AsyncSinkWriterImplBuilder().elementConverter(elementConverter).context(this.sinkInitContext).build();
        Assertions.assertThat((int)elementConverter.getOpenCallCount()).isEqualTo(1);
    }

    @Test
    void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten() throws IOException, InterruptedException {
        this.performNormalWriteOfEightyRecordsToMock();
        Assertions.assertThat((int)this.res.size()).isEqualTo(80);
    }

    @Test
    void testMetricsGroupHasLoggedNumberOfRecordsAndNumberOfBytesCorrectly() throws IOException, InterruptedException {
        this.performNormalWriteOfEightyRecordsToMock();
        Assertions.assertThat((long)this.sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(80L);
        Assertions.assertThat((long)this.sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(320L);
        Assertions.assertThat((Long)((Long)this.sinkInitContext.getCurrentSendTimeGauge().get().getValue())).isGreaterThanOrEqualTo(0L);
        Assertions.assertThat((Long)((Long)this.sinkInitContext.getCurrentSendTimeGauge().get().getValue())).isLessThan(1000L);
    }

    @Test
    void checkLoggedSendTimesAreWithinBounds() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(2).delay(50).build();
        sink.write(String.valueOf(1));
        Thread.sleep(50L);
        long sendStartTimestamp = System.currentTimeMillis();
        sink.flush(true);
        long sendCompleteTimestamp = System.currentTimeMillis();
        Assertions.assertThat((Long)((Long)this.sinkInitContext.getCurrentSendTimeGauge().get().getValue())).isGreaterThanOrEqualTo(50L);
        Assertions.assertThat((Long)((Long)this.sinkInitContext.getCurrentSendTimeGauge().get().getValue())).isLessThanOrEqualTo(sendCompleteTimestamp - sendStartTimestamp);
    }

    @Test
    void testThatUnwrittenRecordsInBufferArePersistedWhenSnapshotIsTaken() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        for (int i = 0; i < 23; ++i) {
            sink.write(String.valueOf(i));
        }
        Assertions.assertThat((int)this.res.size()).isEqualTo(20);
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(sink.wrapRequests(20, 21, 22), this.getWriterState(sink));
    }

    @Test
    void sinkToAllowBatchSizesEqualToByteWiseLimit() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(12L).maxRecordSizeInBytes(4L).build();
        sink.write("1");
        sink.write("2");
        sink.write("3");
        Assertions.assertThat((int)this.res.size()).isEqualTo(3);
    }

    @Test
    void testPreparingCommitAtSnapshotTimeEnsuresBufferedRecordsArePersistedToDestination() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        for (int i = 0; i < 23; ++i) {
            sink.write(String.valueOf(i));
        }
        sink.flush(true);
        Assertions.assertThat((int)this.res.size()).isEqualTo(23);
    }

    @Test
    void testThatMailboxYieldDoesNotBlockWhileATimerIsRegisteredAndHasYetToElapse() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        sink.write(String.valueOf(0));
        sink.flush(true);
        Assertions.assertThat((int)this.res.size()).isEqualTo(1);
    }

    @Test
    void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterAutomaticFlush() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).build();
        sink.write("25");
        sink.write("55");
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(sink.wrapRequests(25, 55), this.getWriterState(sink));
        Assertions.assertThat((int)this.res.size()).isEqualTo(0);
        sink.write("75");
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), this.getWriterState(sink));
        Assertions.assertThat((int)this.res.size()).isEqualTo(3);
    }

    public void writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).build();
        sink.write("25");
        sink.write("55");
        sink.write("75");
        sink.write("95");
        sink.write("955");
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(sink.wrapRequests(95, 955), this.getWriterState(sink));
        sink.flush(true);
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), this.getWriterState(sink));
    }

    @Test
    void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterManualFlush() throws IOException, InterruptedException {
        this.writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
        Assertions.assertThat((int)this.res.size()).isEqualTo(5);
    }

    @Test
    void metricsAreLoggedEachTimeSubmitRequestEntriesIsCalled() throws IOException, InterruptedException {
        this.writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
        Assertions.assertThat((long)this.sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(5L);
        Assertions.assertThat((long)this.sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(20L);
    }

    @Test
    void testRuntimeErrorsInSubmitRequestEntriesEndUpAsIOExceptionsWithNumOfFailedRequests() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).simulateFailures(true).build();
        sink.write("25");
        sink.write("55");
        sink.write("75");
        sink.write("95");
        sink.write("35");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sink.write("135")).isInstanceOf(RuntimeException.class)).hasMessage("Deliberate runtime exception occurred in SinkWriterImplementation.");
        Assertions.assertThat((int)this.res.size()).isEqualTo(3);
    }

    @Test
    void testRetryableErrorsDoNotViolateAtLeastOnceSemanticsDueToRequeueOfFailures() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).maxBatchSizeInBytes(10000000L).simulateFailures(true).build();
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "25", Arrays.asList(new Integer[0]), Arrays.asList(25));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "55", Arrays.asList(new Integer[0]), Arrays.asList(25, 55));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "965", Arrays.asList(25, 55), Arrays.asList(new Integer[0]));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "75", Arrays.asList(25, 55, 965, 75), Arrays.asList(new Integer[0]));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "95", Arrays.asList(25, 55, 965, 75), Arrays.asList(95));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "955", Arrays.asList(25, 55, 965, 75), Arrays.asList(95, 955));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "550", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(new Integer[0]));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "45", Arrays.asList(25, 55, 965, 75, 95, 955, 550), Arrays.asList(45));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "35", Arrays.asList(25, 55, 965, 75, 95, 955, 550), Arrays.asList(45, 35));
        this.writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(sink, "535", Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35), Arrays.asList(new Integer[0]));
        sink.flush(true);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535));
        Assertions.assertThat((long)this.getWriterState(sink).getStateSize()).isEqualTo(0L);
    }

    @Test
    void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfPrepareCommitIsTriggered() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).simulateFailures(true).build();
        this.testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(sink);
    }

    @Test
    void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfBufferFillsToFull() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).maxInFlightRequests(1).maxBufferedRequests(8).simulateFailures(true).build();
        this.testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(sink);
    }

    private void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(AsyncSinkWriterImpl sink) throws IOException, InterruptedException {
        sink.write("25");
        sink.write("55");
        sink.write("965");
        sink.write("75");
        sink.write("95");
        sink.write("955");
        sink.write("550");
        sink.write("645");
        sink.write("545");
        sink.write("535");
        sink.write("515");
        sink.write("505");
        sink.flush(true);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505));
    }

    @Test
    void testThatMaxBufferSizeOfSinkShouldBeStrictlyGreaterThanMaxSizeOfEachBatch() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBufferedRequests(10).build()).isInstanceOf(IllegalArgumentException.class)).hasMessage("The maximum number of requests that may be buffered should be strictly greater than the maximum number of requests per batch.");
    }

    @Test
    void maxRecordSizeSetMustBeSmallerThanOrEqualToMaxBatchSize() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBufferedRequests(11).maxBatchSizeInBytes(10000L).maxRecordSizeInBytes(10001L).build()).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.");
    }

    @Test
    void recordsWrittenToTheSinkMustBeSmallerOrEqualToMaxRecordSizeInBytes() {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(3).maxBufferedRequests(11).maxBatchSizeInBytes(10000L).maxRecordSizeInBytes(3L).build();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sink.write("3")).isInstanceOf(IllegalArgumentException.class)).hasMessage("The request entry sent to the buffer was of size [4], when the maxRecordSizeInBytes was set to [3].");
    }

    private void writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(AsyncSinkWriterImpl sink, String x, List<Integer> y, List<Integer> z) throws IOException, InterruptedException {
        sink.write(x);
        Assertions.assertThat(this.res).isEqualTo(y);
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(sink.wrapRequests(z), this.getWriterState(sink));
    }

    @Test
    void testFlushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(30L).maxRecordSizeInBytes(30L).build();
        for (int i = 0; i < 100; ++i) {
            sink.write(String.valueOf(i));
            Assertions.assertThat((int)this.res.size()).isEqualTo(i / 7 * 7);
        }
    }

    @Test
    void prepareCommitDoesNotFlushElementsIfFlushIsSetToFalse() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).build();
        sink.write(String.valueOf(0));
        sink.write(String.valueOf(1));
        sink.write(String.valueOf(2));
        sink.flush(false);
        Assertions.assertThat((int)this.res.size()).isEqualTo(0);
    }

    @Test
    void testThatWhenNumberOfItemAndSizeOfRecordThresholdsAreMetSimultaneouslyAFlushOccurs() throws IOException, InterruptedException {
        int i;
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(7).maxBatchSizeInBytes(32L).maxRecordSizeInBytes(32L).build();
        for (i = 0; i < 7; ++i) {
            sink.write(String.valueOf(i));
        }
        Assertions.assertThat((int)this.res.size()).isEqualTo(7);
        for (i = 7; i < 14; ++i) {
            sink.write(String.valueOf(i));
        }
        Assertions.assertThat((int)this.res.size()).isEqualTo(14);
    }

    @Test
    void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferWithCorrectSize() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        sink.write(String.valueOf(225));
        sink.write(String.valueOf(1));
        sink.write(String.valueOf(2));
        sink.write(String.valueOf(3));
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        sink.write(String.valueOf(4));
        sink.write(String.valueOf(5));
        sink.write(String.valueOf(6));
        sink.write(String.valueOf(325));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
    }

    @Test
    void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferWithCorrectOrder() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(210L).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        sink.write(String.valueOf(228));
        sink.write(String.valueOf(225));
        sink.write(String.valueOf(1));
        sink.write(String.valueOf(2));
        sink.write(String.valueOf(3));
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        sink.write(String.valueOf(4));
        sink.write(String.valueOf(5));
        sink.write(String.valueOf(6));
        sink.write(String.valueOf(328));
        sink.write(String.valueOf(325));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 228, 225, 3, 4));
    }

    @Test
    void testThatABatchWithSizeSmallerThanMaxBatchSizeIsFlushedOnTimeoutExpiry() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        tpts.setCurrentTime(0L);
        for (int i = 0; i < 8; ++i) {
            sink.write(String.valueOf(i));
        }
        tpts.setCurrentTime(99L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(0);
        tpts.setCurrentTime(100L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(8);
    }

    @Test
    void testThatTimeBasedBatchPicksUpAllRelevantItemsUpUntilExpiryOfTimer() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        for (int i = 0; i < 98; ++i) {
            tpts.setCurrentTime((long)i);
            sink.write(String.valueOf(i));
        }
        tpts.setCurrentTime(99L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(90);
        tpts.setCurrentTime(100L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(98);
    }

    @Test
    void prepareCommitFlushesInflightElementsAndDoesNotFlushIfFlushIsSetToFalse() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(8).maxBufferedRequests(10).simulateFailures(true).build();
        sink.write(String.valueOf(225));
        sink.write(String.valueOf(0));
        sink.write(String.valueOf(1));
        sink.write(String.valueOf(2));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(0, 1));
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(sink.wrapRequests(2), this.getWriterState(sink));
        sink.flush(false);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(0, 1));
        AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual(sink.wrapRequests(225, 2), this.getWriterState(sink));
        sink.flush(true);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(0, 1, 225, 2));
    }

    @Test
    void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferAfterSnapshot() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        sink.write(String.valueOf(225));
        sink.write(String.valueOf(1));
        sink.write(String.valueOf(2));
        sink.write(String.valueOf(3));
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        sink.flush(false);
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        List states = sink.snapshotState(1L);
        AsyncSinkWriterImpl newSink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).buildWithState(states);
        newSink.write(String.valueOf(4));
        newSink.write(String.valueOf(5));
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
    }

    @Test
    void testThatRecordOfSizeBiggerThanMaximumFailsSinkInitialization() throws IOException, InterruptedException {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(110L).simulateFailures(true).build();
        sink.write(String.valueOf(225));
        sink.flush(false);
        List states = sink.snapshotState(1L);
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxRecordSizeInBytes(15L).buildWithState(states)).withMessageContaining("The request entry sent to the buffer was of size [100], when the maxRecordSizeInBytes was set to [15].");
    }

    @Test
    void testRestoreFromMultipleStates() throws IOException {
        List<BufferedRequestState<Integer>> states = Arrays.asList(new BufferedRequestState(Arrays.asList(new RequestEntryWrapper((Object)1, 1L), new RequestEntryWrapper((Object)2, 1L), new RequestEntryWrapper((Object)3, 1L))), new BufferedRequestState(Arrays.asList(new RequestEntryWrapper((Object)4, 1L), new RequestEntryWrapper((Object)5, 1L))), new BufferedRequestState(Collections.singletonList(new RequestEntryWrapper((Object)6, 1L))));
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).buildWithState(states);
        List bufferedRequestStates = sink.snapshotState(1L);
        Assertions.assertThat((List)bufferedRequestStates).hasSize(1);
        BufferedRequestState snapshotState = (BufferedRequestState)bufferedRequestStates.get(0);
        Assertions.assertThat((List)snapshotState.getBufferedRequestEntries()).hasSize(6);
        Assertions.assertThat((long)snapshotState.getStateSize()).isEqualTo(6L);
        Assertions.assertThat(snapshotState.getBufferedRequestEntries().stream().map(RequestEntryWrapper::getRequestEntry).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new Integer[]{1, 2, 3, 4, 5, 6});
    }

    @Test
    void testWriterInitializedWithStateHasCallbackRegistered() throws Exception {
        AsyncSinkWriterImpl initialSinkWriter = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        Assertions.assertThat((int)this.res.size()).isEqualTo(0);
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        tpts.setCurrentTime(0L);
        initialSinkWriter.write("1");
        initialSinkWriter.write("2");
        initialSinkWriter.write("3");
        tpts.setCurrentTime(10L);
        AsyncSinkWriterImpl restoredSinkWriter = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(10L).maxRecordSizeInBytes(10000L).simulateFailures(true).buildWithState(initialSinkWriter.snapshotState(1L));
        restoredSinkWriter.write("4");
        tpts.setCurrentTime(30L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(4);
    }

    @Test
    void testThatOneAndOnlyOneCallbackIsEverRegistered() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        tpts.setCurrentTime(0L);
        sink.write("1");
        Assertions.assertThat((int)this.res.size()).isEqualTo(0);
        tpts.setCurrentTime(10L);
        sink.flush(true);
        Assertions.assertThat((int)this.res.size()).isEqualTo(1);
        tpts.setCurrentTime(20L);
        sink.write("2");
        Assertions.assertThat((int)this.res.size()).isEqualTo(1);
        tpts.setCurrentTime(100L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        sink.write("3");
        tpts.setCurrentTime(199L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        tpts.setCurrentTime(200L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(3);
    }

    @Test
    void testThatIntermittentlyFailingEntriesShouldBeFlushedWithMainBatchInTimeBasedFlush() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        tpts.setCurrentTime(0L);
        sink.write("1");
        sink.write("2");
        sink.write("225");
        tpts.setCurrentTime(100L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        sink.write("3");
        sink.write("4");
        tpts.setCurrentTime(199L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
        tpts.setCurrentTime(200L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(5);
    }

    @Test
    void testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        tpts.setCurrentTime(0L);
        sink.write("1");
        tpts.setCurrentTime(50L);
        sink.flush(true);
        Assertions.assertThat((int)this.res.size()).isEqualTo(1);
        tpts.setCurrentTime(200L);
    }

    @Test
    void testThatOnExpiryOfAnOldTimeoutANewOneMayBeRegisteredImmediately() throws Exception {
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSize(10).maxInFlightRequests(20).maxBatchSizeInBytes(10000L).maxTimeInBufferMS(100L).maxRecordSizeInBytes(10000L).simulateFailures(true).build();
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        tpts.setCurrentTime(0L);
        sink.write("1");
        tpts.setCurrentTime(100L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(1);
        sink.write("2");
        tpts.setCurrentTime(200L);
        Assertions.assertThat((int)this.res.size()).isEqualTo(2);
    }

    @Test
    void testThatInterleavingThreadsMayBlockEachOtherButDoNotCauseRaceConditions() throws Exception {
        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
        CountDownLatch delayedStartLatch = new CountDownLatch(1);
        AsyncSinkReleaseAndBlockWriterImpl sink = new AsyncSinkReleaseAndBlockWriterImpl(this.sinkInitContextAnyThreadMailbox, 1, blockedWriteLatch, delayedStartLatch, true);
        this.writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 3, 4));
    }

    @Test
    void testThatIfOneInterleavedThreadIsBlockedTheOtherThreadWillContinueAndCorrectlyWrite() throws Exception {
        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
        CountDownLatch delayedStartLatch = new CountDownLatch(1);
        AsyncSinkReleaseAndBlockWriterImpl sink = new AsyncSinkReleaseAndBlockWriterImpl(this.sinkInitContextAnyThreadMailbox, 2, blockedWriteLatch, delayedStartLatch, false);
        this.writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(4, 1, 2, 3));
    }

    @Test
    public void testTotalSizeInBytesReflectsBufferBeforeFlush() throws InterruptedException, IOException {
        SimpleBatchCreator simpleBatchCreator = new SimpleBatchCreator(15L);
        DequeRequestBuffer requestBuffer = new DequeRequestBuffer();
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(15L).maxRecordSizeInBytes(5L).maxBatchSize(5).buildWithCustomPluggableComponents((BatchCreator<Integer>)simpleBatchCreator, (RequestBuffer<Integer>)requestBuffer);
        sink.write("1");
        sink.write("2");
        sink.write("3");
        Assertions.assertThat((long)requestBuffer.totalSizeInBytes()).isEqualTo(12L);
        Assertions.assertThat((int)requestBuffer.size()).isEqualTo(3);
        Assertions.assertThat((boolean)requestBuffer.getBufferedState().iterator().hasNext()).isEqualTo((Object)Boolean.TRUE);
    }

    @Test
    public void testTotalSizeInBytesReflectsBufferAfterFlush() throws InterruptedException, IOException {
        SimpleBatchCreator simpleBatchCreator = new SimpleBatchCreator(11L);
        DequeRequestBuffer requestBuffer = new DequeRequestBuffer();
        AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder().context(this.sinkInitContext).maxBatchSizeInBytes(11L).maxRecordSizeInBytes(5L).maxBatchSize(5).buildWithCustomPluggableComponents((BatchCreator<Integer>)simpleBatchCreator, (RequestBuffer<Integer>)requestBuffer);
        sink.write("1");
        sink.write("2");
        sink.write("3");
        Assertions.assertThat((long)requestBuffer.totalSizeInBytes()).isEqualTo(4L);
        Assertions.assertThat((int)requestBuffer.size()).isEqualTo(1);
        Assertions.assertThat((boolean)requestBuffer.getBufferedState().iterator().hasNext()).isEqualTo((Object)Boolean.TRUE);
        RequestEntryWrapper requestEntryWrapper = (RequestEntryWrapper)requestBuffer.getBufferedState().iterator().next();
        Assertions.assertThat((Integer)((Integer)requestEntryWrapper.getRequestEntry())).isEqualTo(3);
        Assertions.assertThat((long)requestEntryWrapper.getSize()).isEqualTo(4L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeTwoElementsAndInterleaveTheNextTwoElements(AsyncSinkWriterImpl sink, CountDownLatch blockedWriteLatch, CountDownLatch delayedStartLatch) throws Exception {
        TestProcessingTimeService tpts = this.sinkInitContext.getTestProcessingTimeService();
        ExecutorService es = Executors.newFixedThreadPool(4);
        try {
            tpts.setCurrentTime(0L);
            sink.write("1");
            sink.write("2");
            es.submit(() -> {
                try {
                    sink.writeAsNonMailboxThread("3");
                }
                catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            });
            delayedStartLatch.await();
            sink.write("4");
            tpts.setCurrentTime(100L);
            blockedWriteLatch.countDown();
            es.shutdown();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)es.awaitTermination(500L, TimeUnit.MILLISECONDS)).as("Executor Service stuck at termination, not terminated after 500ms!", new Object[0])).isTrue();
        }
        finally {
            es.shutdown();
        }
    }

    @Test
    void ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod() throws Exception {
        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
        CountDownLatch delayedStartLatch = new CountDownLatch(1);
        AsyncSinkReleaseAndBlockWriterImpl sink = new AsyncSinkReleaseAndBlockWriterImpl(this.sinkInitContextAnyThreadMailbox, 1, blockedWriteLatch, delayedStartLatch, false);
        Thread t = new Thread(() -> {
            try {
                sink.writeAsNonMailboxThread("1");
                sink.writeAsNonMailboxThread("2");
                sink.writeAsNonMailboxThread("3");
            }
            catch (IOException | InterruptedException e) {
                e.printStackTrace();
                Assertions.fail((String)"Auxiliary thread encountered an exception when writing to the sink", (Throwable)e);
            }
        });
        t.start();
        delayedStartLatch.await();
        Thread s = new Thread(() -> {
            try {
                sink.flush(true);
                Assertions.fail((String)"Sink did not block successfully and reached here when it shouldn't have.");
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        });
        Thread.sleep(300L);
        Assertions.assertThat((boolean)s.isInterrupted()).isFalse();
        s.interrupt();
        blockedWriteLatch.countDown();
        t.join();
        Assertions.assertThat(this.res).isEqualTo(Arrays.asList(1, 2, 3));
    }

    private BufferedRequestState<Integer> getWriterState(AsyncSinkWriter<String, Integer> sinkWriter) {
        List states = sinkWriter.snapshotState(1L);
        Assertions.assertThat((int)states.size()).isEqualTo(1);
        return (BufferedRequestState)states.get(0);
    }

    private class AsyncSinkReleaseAndBlockWriterImpl
    extends AsyncSinkWriterImpl {
        private final CountDownLatch blockedThreadLatch;
        private final CountDownLatch delayedStartLatch;
        private final boolean blockForLimitedTime;

        public AsyncSinkReleaseAndBlockWriterImpl(WriterInitContext context, int maxInFlightRequests, CountDownLatch blockedThreadLatch, CountDownLatch delayedStartLatch, boolean blockForLimitedTime) {
            super((ElementConverter<String, Integer>)(ElementConverter & Serializable)(elem, ctx) -> Integer.parseInt(elem), context, 3, maxInFlightRequests, 20, 100L, 100L, 100L, false, 0, Collections.emptyList());
            this.blockedThreadLatch = blockedThreadLatch;
            this.delayedStartLatch = delayedStartLatch;
            this.blockForLimitedTime = blockForLimitedTime;
        }

        @Override
        protected void submitRequestEntries(List<Integer> requestEntries, ResultHandler<Integer> resultHandler) {
            if (requestEntries.size() == 3) {
                try {
                    this.delayedStartLatch.countDown();
                    if (this.blockForLimitedTime) {
                        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.blockedThreadLatch.await(500L, TimeUnit.MILLISECONDS)).as("The countdown latch was released before the full amountof time was reached.", new Object[0])).isFalse();
                    } else {
                        this.blockedThreadLatch.await();
                    }
                }
                catch (InterruptedException e) {
                    Assertions.fail((String)"The unit test latch must not have been interrupted by another thread.");
                }
            }
            AsyncSinkWriterTest.this.res.addAll(requestEntries);
            resultHandler.complete();
        }
    }

    private class AsyncSinkWriterImplBuilder {
        private ElementConverter<String, Integer> elementConverter = (ElementConverter & Serializable)(elem, ctx) -> Integer.parseInt(elem);
        private boolean simulateFailures = false;
        private int delay = 0;
        private WriterInitContext context;
        private int maxBatchSize = 10;
        private int maxInFlightRequests = 1;
        private int maxBufferedRequests = 100;
        private long maxBatchSizeInBytes = 110L;
        private long maxTimeInBufferMS = 1000L;
        private long maxRecordSizeInBytes = this.maxBatchSizeInBytes;

        private AsyncSinkWriterImplBuilder() {
        }

        private AsyncSinkWriterImplBuilder elementConverter(ElementConverter<String, Integer> elementConverter) {
            this.elementConverter = elementConverter;
            return this;
        }

        private AsyncSinkWriterImplBuilder context(WriterInitContext context) {
            this.context = context;
            return this;
        }

        private AsyncSinkWriterImplBuilder maxBatchSize(int maxBatchSize) {
            this.maxBatchSize = maxBatchSize;
            return this;
        }

        private AsyncSinkWriterImplBuilder maxInFlightRequests(int maxInFlightRequests) {
            this.maxInFlightRequests = maxInFlightRequests;
            return this;
        }

        private AsyncSinkWriterImplBuilder maxBufferedRequests(int maxBufferedRequests) {
            this.maxBufferedRequests = maxBufferedRequests;
            return this;
        }

        private AsyncSinkWriterImplBuilder maxBatchSizeInBytes(long maxBatchSizeInBytes) {
            this.maxBatchSizeInBytes = maxBatchSizeInBytes;
            return this;
        }

        private AsyncSinkWriterImplBuilder maxTimeInBufferMS(long maxTimeInBufferMS) {
            this.maxTimeInBufferMS = maxTimeInBufferMS;
            return this;
        }

        private AsyncSinkWriterImplBuilder maxRecordSizeInBytes(long maxRecordSizeInBytes) {
            this.maxRecordSizeInBytes = maxRecordSizeInBytes;
            return this;
        }

        private AsyncSinkWriterImplBuilder delay(int delay) {
            this.delay = delay;
            return this;
        }

        private AsyncSinkWriterImplBuilder simulateFailures(boolean simulateFailures) {
            this.simulateFailures = simulateFailures;
            return this;
        }

        private AsyncSinkWriterImpl build() {
            return new AsyncSinkWriterImpl(this.elementConverter, this.context, this.maxBatchSize, this.maxInFlightRequests, this.maxBufferedRequests, this.maxBatchSizeInBytes, this.maxTimeInBufferMS, this.maxRecordSizeInBytes, this.simulateFailures, this.delay, Collections.emptyList());
        }

        private AsyncSinkWriterImpl buildWithState(List<BufferedRequestState<Integer>> bufferedState) {
            return new AsyncSinkWriterImpl(this.elementConverter, this.context, this.maxBatchSize, this.maxInFlightRequests, this.maxBufferedRequests, this.maxBatchSizeInBytes, this.maxTimeInBufferMS, this.maxRecordSizeInBytes, this.simulateFailures, this.delay, bufferedState);
        }

        private AsyncSinkWriterImpl buildWithCustomPluggableComponents(BatchCreator<Integer> batchCreator, RequestBuffer<Integer> requestBuffer) {
            return new AsyncSinkWriterImpl(this.elementConverter, this.context, this.maxBatchSize, this.maxInFlightRequests, this.maxBufferedRequests, this.maxBatchSizeInBytes, this.maxTimeInBufferMS, this.maxRecordSizeInBytes, this.simulateFailures, this.delay, Collections.emptyList(), batchCreator, requestBuffer);
        }
    }

    private class AsyncSinkWriterImpl
    extends AsyncSinkWriter<String, Integer> {
        private final Set<Integer> failedFirstAttempts;
        private final boolean simulateFailures;
        private final int delay;

        private AsyncSinkWriterImpl(ElementConverter<String, Integer> elementConverter, WriterInitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean simulateFailures, int delay, List<BufferedRequestState<Integer>> bufferedState) {
            super(elementConverter, context, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(maxBatchSize).setMaxBatchSizeInBytes(maxBatchSizeInBytes).setMaxInFlightRequests(maxInFlightRequests).setMaxBufferedRequests(maxBufferedRequests).setMaxTimeInBufferMS(maxTimeInBufferMS).setMaxRecordSizeInBytes(maxRecordSizeInBytes).setRateLimitingStrategy((RateLimitingStrategy)CongestionControlRateLimitingStrategy.builder().setInitialMaxInFlightMessages(maxBatchSize * maxInFlightRequests).setMaxInFlightRequests(maxInFlightRequests).setScalingStrategy((ScalingStrategy)AIMDScalingStrategy.builder((int)(maxBatchSize * maxInFlightRequests)).build()).build()).build(), bufferedState);
            this.failedFirstAttempts = new HashSet<Integer>();
            this.simulateFailures = simulateFailures;
            this.delay = delay;
        }

        private AsyncSinkWriterImpl(ElementConverter<String, Integer> elementConverter, WriterInitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean simulateFailures, int delay, List<BufferedRequestState<Integer>> bufferedState, BatchCreator<Integer> batchCreator, RequestBuffer<Integer> requestBuffer) {
            super(elementConverter, context, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(maxBatchSize).setMaxBatchSizeInBytes(maxBatchSizeInBytes).setMaxInFlightRequests(maxInFlightRequests).setMaxBufferedRequests(maxBufferedRequests).setMaxTimeInBufferMS(maxTimeInBufferMS).setMaxRecordSizeInBytes(maxRecordSizeInBytes).setRateLimitingStrategy((RateLimitingStrategy)CongestionControlRateLimitingStrategy.builder().setInitialMaxInFlightMessages(maxBatchSize * maxInFlightRequests).setMaxInFlightRequests(maxInFlightRequests).setScalingStrategy((ScalingStrategy)AIMDScalingStrategy.builder((int)(maxBatchSize * maxInFlightRequests)).build()).build()).build(), bufferedState, batchCreator, requestBuffer);
            this.failedFirstAttempts = new HashSet<Integer>();
            this.simulateFailures = simulateFailures;
            this.delay = delay;
        }

        public void write(String val) throws IOException, InterruptedException {
            this.yieldMailbox(AsyncSinkWriterTest.this.sinkInitContext.getMailboxExecutor());
            this.yieldMailbox(AsyncSinkWriterTest.this.sinkInitContextAnyThreadMailbox.getMailboxExecutor());
            this.write(val, null);
        }

        public void yieldMailbox(MailboxExecutor mailbox) {
            boolean canYield = true;
            while (canYield) {
                canYield = mailbox.tryYield();
            }
        }

        public void writeAsNonMailboxThread(String val) throws IOException, InterruptedException {
            this.write(val, null);
        }

        protected void submitRequestEntries(List<Integer> requestEntries, ResultHandler<Integer> resultHandler) {
            this.maybeDelay();
            if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 200)) {
                throw new RuntimeException("Deliberate runtime exception occurred in SinkWriterImplementation.");
            }
            if (this.simulateFailures) {
                List successfulRetries = this.failedFirstAttempts.stream().filter(requestEntries::contains).collect(Collectors.toList());
                this.failedFirstAttempts.removeIf(successfulRetries::contains);
                List firstTimeFailed = requestEntries.stream().filter(x -> !successfulRetries.contains(x)).filter(val -> val > 200).collect(Collectors.toList());
                this.failedFirstAttempts.addAll(firstTimeFailed);
                requestEntries.removeAll(firstTimeFailed);
                AsyncSinkWriterTest.this.res.addAll(requestEntries);
                resultHandler.retryForEntries(firstTimeFailed);
            } else {
                AsyncSinkWriterTest.this.res.addAll(requestEntries);
                resultHandler.complete();
            }
        }

        private void maybeDelay() {
            if (this.delay <= 0) {
                return;
            }
            try {
                Thread.sleep(this.delay);
            }
            catch (InterruptedException e) {
                Assertions.fail((String)"Thread sleeping for delay in submitRequestEntries was interrupted.");
            }
        }

        protected long getSizeInBytes(Integer requestEntry) {
            return requestEntry > 200 && this.simulateFailures ? 100L : 4L;
        }

        public BufferedRequestState<Integer> wrapRequests(Integer ... requests) {
            return this.wrapRequests(Arrays.asList(requests));
        }

        public BufferedRequestState<Integer> wrapRequests(List<Integer> requests) {
            ArrayList<RequestEntryWrapper> wrapperList = new ArrayList<RequestEntryWrapper>();
            for (Integer request : requests) {
                wrapperList.add(new RequestEntryWrapper((Object)request, this.getSizeInBytes(request)));
            }
            return new BufferedRequestState(wrapperList);
        }
    }
}

