package org.apache.flink.connector.base.source.reader.fetcher;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.test.util.TestUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.class */
class SplitFetcherManagerTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest$AwaitingReader.class */
    public static final class AwaitingReader<E, SplitT extends SourceSplit> implements SplitReader<E, SplitT> {
        private final Queue<RecordsWithSplitIds<E>> fetches;
        private final IOException testError;
        private final OneShotLatch inBlocking = new OneShotLatch();
        private final OneShotLatch throwError = new OneShotLatch();
        private volatile boolean isClosed = false;

        @SafeVarargs
        AwaitingReader(IOException iOException, RecordsWithSplitIds<E>... recordsWithSplitIdsArr) {
            this.testError = iOException;
            this.fetches = new ArrayDeque(Arrays.asList(recordsWithSplitIdsArr));
        }

        public RecordsWithSplitIds<E> fetch() throws IOException {
            if (!this.fetches.isEmpty()) {
                return this.fetches.poll();
            }
            this.inBlocking.trigger();
            try {
                this.throwError.await();
                throw this.testError;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("interrupted");
            }
        }

        public void handleSplitsChanges(SplitsChange<SplitT> splitsChange) {
        }

        public void wakeUp() {
        }

        public void close() throws Exception {
            this.isClosed = true;
        }

        public void awaitAllRecordsReturned() throws InterruptedException {
            this.inBlocking.await();
        }

        public void triggerThrowException() {
            this.throwError.trigger();
        }
    }

    SplitFetcherManagerTest() {
    }

    @Test
    void testExceptionPropagationFirstFetch() throws Exception {
        testExceptionPropagation(new RecordsWithSplitIds[0]);
    }

    @Test
    void testExceptionPropagationSuccessiveFetch() throws Exception {
        testExceptionPropagation(new TestingRecordsWithSplitIds("testSplit", 1, 2, 3, 4), new TestingRecordsWithSplitIds("testSplit", 5, 6, 7, 8));
    }

    @Test
    void testCloseFetcherWithException() throws Exception {
        TestingSplitReader testingSplitReader = new TestingSplitReader(new RecordsWithSplitIds[0]);
        testingSplitReader.setCloseWithException();
        SplitFetcherManager createFetcher = createFetcher("test-split", testingSplitReader, new Configuration());
        createFetcher.close(1000L);
        Objects.requireNonNull(createFetcher);
        Assertions.assertThatThrownBy(createFetcher::checkErrors).hasRootCauseMessage("Artificial exception on closing the split reader.");
    }

    @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
    @Test
    void testCloseCleansUpPreviouslyClosedFetcher() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
        SplitFetcherManager createFetcher = createFetcher("testSplit", new AwaitingReader(new IOException("Should not happen"), new RecordsBySplits(Collections.emptyMap(), Collections.singleton("testSplit"))), configuration);
        createFetcher.getQueue().getAvailabilityFuture().get();
        TestUtils.waitUntil(() -> {
            createFetcher.maybeShutdownFinishedFetchers();
            return Boolean.valueOf(createFetcher.fetchers.isEmpty());
        }, "The idle fetcher should have been removed.");
        createFetcher.close(Long.MAX_VALUE);
    }

    @Test
    void testIdleShutdownSplitFetcherWaitsUntilRecordProcessed() throws Exception {
        AwaitingReader awaitingReader = new AwaitingReader(new IOException("Should not happen"), new RecordsBySplits(Collections.emptyMap(), Collections.singleton("testSplit")));
        SplitFetcherManager createFetcher = createFetcher("testSplit", awaitingReader, new Configuration());
        try {
            FutureCompletingBlockingQueue queue = createFetcher.getQueue();
            queue.getAvailabilityFuture().get();
            TestUtils.waitUntil(() -> {
                createFetcher.maybeShutdownFinishedFetchers();
                return Boolean.valueOf(createFetcher.getNumAliveFetchers() == 0);
            }, Duration.ofSeconds(1L), "The fetcher should have already been removed from the alive fetchers.");
            TestUtils.waitUntil(() -> {
                return Boolean.valueOf(queue.size() == 2);
            }, Duration.ofSeconds(1L), "The element queue should have 2 batches when the fetcher is closed.");
            ((RecordsWithSplitIds) queue.poll()).recycle();
            ((AbstractBooleanAssert) Assertions.assertThat(awaitingReader.isClosed).as("The reader should have not been closed.", new Object[0])).isFalse();
            ((RecordsWithSplitIds) queue.poll()).recycle();
            TestUtils.waitUntil(() -> {
                return Boolean.valueOf(awaitingReader.isClosed);
            }, Duration.ofSeconds(1L), "The reader should hava been closed.");
            createFetcher.close(30000L);
        } catch (Throwable th) {
            createFetcher.close(30000L);
            throw th;
        }
    }

    @SafeVarargs
    private final void testExceptionPropagation(RecordsWithSplitIds<Integer>... recordsWithSplitIdsArr) throws Exception {
        IOException iOException = new IOException("test");
        AwaitingReader awaitingReader = new AwaitingReader(iOException, recordsWithSplitIdsArr);
        Configuration configuration = new Configuration();
        configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 10);
        SplitFetcherManager createFetcher = createFetcher("testSplit", awaitingReader, configuration);
        awaitingReader.awaitAllRecordsReturned();
        drainQueue(createFetcher.getQueue());
        Assertions.assertThat(createFetcher.getQueue().getAvailabilityFuture().isDone()).isFalse();
        awaitingReader.triggerThrowException();
        createFetcher.getQueue().getAvailabilityFuture().get();
        try {
            try {
                createFetcher.checkErrors();
                Assertions.fail("expected exception");
                createFetcher.close(20000L);
            } catch (Exception e) {
                Assertions.assertThat(e.getCause().getCause()).isSameAs(iOException);
                createFetcher.close(20000L);
            }
        } catch (Throwable th) {
            createFetcher.close(20000L);
            throw th;
        }
    }

    private static <E> SplitFetcherManager<E, TestingSourceSplit> createFetcher(String str, SplitReader<E, TestingSourceSplit> splitReader, Configuration configuration) {
        SingleThreadFetcherManager singleThreadFetcherManager = new SingleThreadFetcherManager(() -> {
            return splitReader;
        }, configuration);
        singleThreadFetcherManager.addSplits(Collections.singletonList(new TestingSourceSplit(str)));
        return singleThreadFetcherManager;
    }

    private static void drainQueue(FutureCompletingBlockingQueue<?> futureCompletingBlockingQueue) {
        do {
        } while (futureCompletingBlockingQueue.poll() != null);
    }
}
