package io.trino.split;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.trino.split.MockSplitSource;
import io.trino.split.SplitSource;
import java.util.Objects;
import java.util.concurrent.Future;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testng.Assert;

/* loaded from: input_file:io/trino/split/TestBufferingSplitSource.class */
public class TestBufferingSplitSource {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/split/TestBufferingSplitSource$NextBatchResult.class */
    public static class NextBatchResult {
        private final SplitSource.SplitBatch splitBatch;

        public NextBatchResult(SplitSource.SplitBatch splitBatch) {
            this.splitBatch = (SplitSource.SplitBatch) Objects.requireNonNull(splitBatch, "splitBatch is null");
        }

        public NextBatchResult assertSize(int i) {
            Assert.assertEquals(this.splitBatch.getSplits().size(), i);
            return this;
        }

        public NextBatchResult assertNoMoreSplits(boolean z) {
            Assert.assertEquals(this.splitBatch.isLastBatch(), z);
            return this;
        }
    }

    @Test
    public void testSlowSource() {
        MockSplitSource atSplitCompletion = new MockSplitSource().setBatchSize(1).increaseAvailableSplits(25).atSplitCompletion(MockSplitSource.Action.FINISH);
        BufferingSplitSource bufferingSplitSource = new BufferingSplitSource(atSplitCompletion, 10);
        try {
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 20))).assertSize(10).assertNoMoreSplits(false);
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 6))).assertSize(6).assertNoMoreSplits(false);
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 20))).assertSize(9).assertNoMoreSplits(true);
            Assert.assertTrue(bufferingSplitSource.isFinished());
            Assert.assertEquals(atSplitCompletion.getNextBatchInvocationCount(), 25);
            bufferingSplitSource.close();
        } catch (Throwable th) {
            try {
                bufferingSplitSource.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testFastSource() {
        MockSplitSource atSplitCompletion = new MockSplitSource().setBatchSize(11).increaseAvailableSplits(22).atSplitCompletion(MockSplitSource.Action.FINISH);
        BufferingSplitSource bufferingSplitSource = new BufferingSplitSource(atSplitCompletion, 10);
        try {
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 200))).assertSize(11).assertNoMoreSplits(false);
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 200))).assertSize(11).assertNoMoreSplits(true);
            Assert.assertTrue(bufferingSplitSource.isFinished());
            Assert.assertEquals(atSplitCompletion.getNextBatchInvocationCount(), 2);
            bufferingSplitSource.close();
        } catch (Throwable th) {
            try {
                bufferingSplitSource.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testEmptySource() {
        MockSplitSource atSplitCompletion = new MockSplitSource().setBatchSize(1).atSplitCompletion(MockSplitSource.Action.FINISH);
        BufferingSplitSource bufferingSplitSource = new BufferingSplitSource(atSplitCompletion, 100);
        try {
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 200))).assertSize(0).assertNoMoreSplits(true);
            Assert.assertTrue(bufferingSplitSource.isFinished());
            Assert.assertEquals(atSplitCompletion.getNextBatchInvocationCount(), 1);
            bufferingSplitSource.close();
        } catch (Throwable th) {
            try {
                bufferingSplitSource.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testBlocked() {
        MockSplitSource batchSize = new MockSplitSource().setBatchSize(1);
        BufferingSplitSource bufferingSplitSource = new BufferingSplitSource(batchSize, 10);
        try {
            ListenableFuture<NextBatchResult> nextBatch = getNextBatch(bufferingSplitSource, 10);
            Assert.assertFalse(nextBatch.isDone());
            batchSize.increaseAvailableSplits(9);
            Assert.assertFalse(nextBatch.isDone());
            batchSize.increaseAvailableSplits(1);
            ((NextBatchResult) requireFutureValue(nextBatch)).assertSize(10).assertNoMoreSplits(false);
            ListenableFuture<NextBatchResult> nextBatch2 = getNextBatch(bufferingSplitSource, 10);
            Assert.assertFalse(nextBatch2.isDone());
            batchSize.atSplitCompletion(MockSplitSource.Action.FINISH);
            ((NextBatchResult) requireFutureValue(nextBatch2)).assertSize(0).assertNoMoreSplits(true);
            Assert.assertTrue(bufferingSplitSource.isFinished());
            bufferingSplitSource.close();
            MockSplitSource batchSize2 = new MockSplitSource().setBatchSize(1);
            bufferingSplitSource = new BufferingSplitSource(batchSize2, 10);
            try {
                batchSize2.increaseAvailableSplits(1);
                ListenableFuture<NextBatchResult> nextBatch3 = getNextBatch(bufferingSplitSource, 10);
                Assert.assertFalse(nextBatch3.isDone());
                batchSize2.increaseAvailableSplits(9);
                ((NextBatchResult) requireFutureValue(nextBatch3)).assertSize(10).assertNoMoreSplits(false);
                ListenableFuture<NextBatchResult> nextBatch4 = getNextBatch(bufferingSplitSource, 10);
                batchSize2.increaseAvailableSplits(5);
                Assert.assertFalse(nextBatch4.isDone());
                batchSize2.atSplitCompletion(MockSplitSource.Action.FINISH);
                ((NextBatchResult) requireFutureValue(nextBatch4)).assertSize(5).assertNoMoreSplits(true);
                Assert.assertTrue(bufferingSplitSource.isFinished());
                bufferingSplitSource.close();
                MockSplitSource batchSize3 = new MockSplitSource().setBatchSize(1);
                BufferingSplitSource bufferingSplitSource2 = new BufferingSplitSource(batchSize3, 10);
                try {
                    batchSize3.increaseAvailableSplits(9);
                    ListenableFuture<NextBatchResult> nextBatch5 = getNextBatch(bufferingSplitSource2, 10);
                    Assert.assertFalse(nextBatch5.isDone());
                    batchSize3.increaseAvailableSplits(1);
                    ((NextBatchResult) requireFutureValue(nextBatch5)).assertSize(10).assertNoMoreSplits(false);
                    ListenableFuture<NextBatchResult> nextBatch6 = getNextBatch(bufferingSplitSource2, 10);
                    batchSize3.increaseAvailableSplits(5);
                    Assert.assertFalse(nextBatch6.isDone());
                    batchSize3.atSplitCompletion(MockSplitSource.Action.FAIL);
                    assertFutureFailsWithMockFailure(nextBatch6);
                    Assert.assertFalse(bufferingSplitSource2.isFinished());
                    bufferingSplitSource2.close();
                    MockSplitSource batchSize4 = new MockSplitSource().setBatchSize(8);
                    bufferingSplitSource = new BufferingSplitSource(batchSize4, 10);
                    try {
                        batchSize4.increaseAvailableSplits(8);
                        ListenableFuture<NextBatchResult> nextBatch7 = getNextBatch(bufferingSplitSource, 20);
                        Assert.assertFalse(nextBatch7.isDone());
                        batchSize4.increaseAvailableSplits(8);
                        ((NextBatchResult) requireFutureValue(nextBatch7)).assertSize(16).assertNoMoreSplits(false);
                        bufferingSplitSource.close();
                    } finally {
                        try {
                            bufferingSplitSource.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                } finally {
                    try {
                        bufferingSplitSource2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testFinishedSetWithoutIndicationFromSplitBatch() {
        MockSplitSource increaseAvailableSplits = new MockSplitSource().setBatchSize(1).increaseAvailableSplits(1);
        BufferingSplitSource bufferingSplitSource = new BufferingSplitSource(increaseAvailableSplits, 100);
        try {
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 1))).assertSize(1).assertNoMoreSplits(false);
            Assert.assertFalse(bufferingSplitSource.isFinished());
            increaseAvailableSplits.atSplitCompletion(MockSplitSource.Action.FINISH);
            ((NextBatchResult) requireFutureValue(getNextBatch(bufferingSplitSource, 1))).assertSize(0).assertNoMoreSplits(true);
            Assert.assertTrue(bufferingSplitSource.isFinished());
            Assert.assertEquals(increaseAvailableSplits.getNextBatchInvocationCount(), 2);
            bufferingSplitSource.close();
        } catch (Throwable th) {
            try {
                bufferingSplitSource.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testFailImmediate() {
        MockSplitSource atSplitCompletion = new MockSplitSource().setBatchSize(1).atSplitCompletion(MockSplitSource.Action.FAIL);
        BufferingSplitSource bufferingSplitSource = new BufferingSplitSource(atSplitCompletion, 100);
        try {
            assertFutureFailsWithMockFailure(getNextBatch(bufferingSplitSource, 200));
            Assert.assertEquals(atSplitCompletion.getNextBatchInvocationCount(), 1);
            bufferingSplitSource.close();
        } catch (Throwable th) {
            try {
                bufferingSplitSource.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testFail() {
        MockSplitSource atSplitCompletion = new MockSplitSource().setBatchSize(1).increaseAvailableSplits(1).atSplitCompletion(MockSplitSource.Action.FAIL);
        BufferingSplitSource bufferingSplitSource = new BufferingSplitSource(atSplitCompletion, 100);
        try {
            assertFutureFailsWithMockFailure(getNextBatch(bufferingSplitSource, 2));
            Assert.assertEquals(atSplitCompletion.getNextBatchInvocationCount(), 2);
            bufferingSplitSource.close();
        } catch (Throwable th) {
            try {
                bufferingSplitSource.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static void assertFutureFailsWithMockFailure(ListenableFuture<?> listenableFuture) {
        Assert.assertTrue(listenableFuture.isDone());
        Objects.requireNonNull(listenableFuture);
        Assertions.assertThatThrownBy(listenableFuture::get).hasMessageContaining("Mock failure");
    }

    private static <T> T requireFutureValue(Future<T> future) {
        return (T) MoreFutures.tryGetFutureValue(future).orElseThrow(AssertionError::new);
    }

    private static ListenableFuture<NextBatchResult> getNextBatch(SplitSource splitSource, int i) {
        return Futures.transform(splitSource.getNextBatch(i), NextBatchResult::new, MoreExecutors.directExecutor());
    }
}
