package org.apache.flink.connector.base.source.hybrid;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.mock.Whitebox;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.class */
public class HybridSourceReaderTest {

    /* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest$MutableFutureSourceReader.class */
    private static class MutableFutureSourceReader extends MockSourceReader {
        private CompletableFuture<Void> availabilityFuture;

        public MutableFutureSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> futureCompletingBlockingQueue, Supplier<SplitReader<int[], MockSourceSplit>> supplier, Configuration configuration, SourceReaderContext sourceReaderContext) {
            super(futureCompletingBlockingQueue, supplier, configuration, sourceReaderContext);
            this.availabilityFuture = new CompletableFuture<>();
        }

        public static MutableFutureSourceReader createReader(SourceReaderContext sourceReaderContext) {
            FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
            Configuration configuration = new Configuration();
            configuration.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 2);
            configuration.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
            MockSplitReader.Builder blockingFetch = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setBlockingFetch(true);
            blockingFetch.getClass();
            return new MutableFutureSourceReader(futureCompletingBlockingQueue, blockingFetch::build, configuration, sourceReaderContext);
        }

        public CompletableFuture<Void> isAvailable() {
            return this.availabilityFuture;
        }

        public void completeFuture() {
            this.availabilityFuture.complete(null);
        }

        public void resetFuture() {
            if (this.availabilityFuture.isDone()) {
                this.availabilityFuture = new CompletableFuture<>();
            }
        }
    }

    @Test
    public void testReader() throws Exception {
        SourceReaderContext testingReaderContext = new TestingReaderContext();
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        MockBaseSource mockBaseSource = new MockBaseSource(1, 1, Boundedness.BOUNDED);
        final SourceReader<Integer, MockSourceSplit> createReader = mockBaseSource.createReader(testingReaderContext);
        final SourceReader<Integer, MockSourceSplit> createReader2 = mockBaseSource.createReader(testingReaderContext);
        HybridSourceReader hybridSourceReader = new HybridSourceReader(testingReaderContext);
        Assertions.assertThat(testingReaderContext.getSentEvents()).isEmpty();
        hybridSourceReader.start();
        assertAndClearSourceReaderFinishedEvent(testingReaderContext, -1);
        Assertions.assertThat(currentReader(hybridSourceReader)).isNull();
        Assertions.assertThat(hybridSourceReader.pollNext(testingReaderOutput)).isEqualTo(InputStatus.NOTHING_AVAILABLE);
        hybridSourceReader.handleSourceEvents(new SwitchSourceEvent(0, new MockSource(null, 0) { // from class: org.apache.flink.connector.base.source.hybrid.HybridSourceReaderTest.1
            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
                return createReader;
            }
        }, false));
        MockSourceSplit mockSourceSplit = new MockSourceSplit(0, 0, 1);
        mockSourceSplit.addRecord(0);
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(0, mockBaseSource);
        hybridSourceReader.addSplits(Collections.singletonList(HybridSourceSplit.wrapSplit(mockSourceSplit, 0, switchedSources)));
        InputStatus pollNext = hybridSourceReader.pollNext(testingReaderOutput);
        while (true) {
            if (!testingReaderOutput.getEmittedRecords().isEmpty() && pollNext != InputStatus.MORE_AVAILABLE) {
                Assertions.assertThat(testingReaderOutput.getEmittedRecords()).contains(new Integer[]{0});
                hybridSourceReader.pollNext(testingReaderOutput);
                Assertions.assertThat(hybridSourceReader.pollNext(testingReaderOutput)).as("before notifyNoMoreSplits", new Object[0]).isEqualTo(InputStatus.NOTHING_AVAILABLE);
                hybridSourceReader.notifyNoMoreSplits();
                hybridSourceReader.pollNext(testingReaderOutput);
                assertAndClearSourceReaderFinishedEvent(testingReaderContext, 0);
                Assertions.assertThat(currentReader(hybridSourceReader)).as("reader before switch source event", new Object[0]).isEqualTo(createReader);
                hybridSourceReader.handleSourceEvents(new SwitchSourceEvent(1, new MockSource(null, 0) { // from class: org.apache.flink.connector.base.source.hybrid.HybridSourceReaderTest.2
                    public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
                        return createReader2;
                    }
                }, true));
                Assertions.assertThat(currentReader(hybridSourceReader)).as("reader after switch source event", new Object[0]).isEqualTo(createReader2);
                hybridSourceReader.notifyNoMoreSplits();
                Assertions.assertThat(hybridSourceReader.pollNext(testingReaderOutput)).as("reader 1 after notifyNoMoreSplits", new Object[0]).isEqualTo(InputStatus.END_OF_INPUT);
                hybridSourceReader.close();
                return;
            }
            pollNext = hybridSourceReader.pollNext(testingReaderOutput);
            Thread.sleep(10L);
        }
    }

    @Test
    public void testAvailabilityFutureSwitchover() throws Exception {
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        MockBaseSource mockBaseSource = new MockBaseSource(1, 1, Boundedness.BOUNDED);
        final MutableFutureSourceReader createReader = MutableFutureSourceReader.createReader(testingReaderContext);
        final MutableFutureSourceReader createReader2 = MutableFutureSourceReader.createReader(testingReaderContext);
        HybridSourceReader hybridSourceReader = new HybridSourceReader(testingReaderContext);
        Assertions.assertThat(testingReaderContext.getSentEvents()).isEmpty();
        hybridSourceReader.start();
        assertAndClearSourceReaderFinishedEvent(testingReaderContext, -1);
        Assertions.assertThat(currentReader(hybridSourceReader)).isNull();
        Assertions.assertThat(hybridSourceReader.pollNext(testingReaderOutput)).isEqualTo(InputStatus.NOTHING_AVAILABLE);
        CompletableFuture isAvailable = hybridSourceReader.isAvailable();
        Assertions.assertThat(isAvailable).isNotDone();
        hybridSourceReader.handleSourceEvents(new SwitchSourceEvent(0, new MockSource(null, 0) { // from class: org.apache.flink.connector.base.source.hybrid.HybridSourceReaderTest.3
            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
                return createReader;
            }
        }, false));
        Assertions.assertThat(isAvailable).isDone().as("the previous underlying future should be completed after switch event", new Object[0]);
        MockSourceSplit mockSourceSplit = new MockSourceSplit(0, 0, 1);
        mockSourceSplit.addRecord(0);
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(0, mockBaseSource);
        hybridSourceReader.addSplits(Collections.singletonList(HybridSourceSplit.wrapSplit(mockSourceSplit, 0, switchedSources)));
        CompletableFuture isAvailable2 = hybridSourceReader.isAvailable();
        createReader.completeFuture();
        Assertions.assertThat(isAvailable2).isDone().as("underlying future is complete and hybrid source should poll", new Object[0]);
        InputStatus pollNext = hybridSourceReader.pollNext(testingReaderOutput);
        while (true) {
            if (!testingReaderOutput.getEmittedRecords().isEmpty() && pollNext != InputStatus.MORE_AVAILABLE) {
                createReader.resetFuture();
                CompletableFuture isAvailable3 = hybridSourceReader.isAvailable();
                Assertions.assertThat(isAvailable2).isNotEqualTo(isAvailable3).as("Future should have been refreshed since the previous future is complete", new Object[0]);
                Assertions.assertThat(isAvailable3).isNotDone().as("Future should not be complete", new Object[0]);
                Assertions.assertThat(testingReaderOutput.getEmittedRecords()).contains(new Integer[]{0});
                hybridSourceReader.pollNext(testingReaderOutput);
                Assertions.assertThat(hybridSourceReader.pollNext(testingReaderOutput)).as("before notifyNoMoreSplits", new Object[0]).isEqualTo(InputStatus.NOTHING_AVAILABLE);
                hybridSourceReader.notifyNoMoreSplits();
                hybridSourceReader.pollNext(testingReaderOutput);
                assertAndClearSourceReaderFinishedEvent(testingReaderContext, 0);
                Assertions.assertThat(isAvailable3).isNotDone().as("still no more records and runtime should not poll", new Object[0]);
                Assertions.assertThat(currentReader(hybridSourceReader)).as("reader before switch source event", new Object[0]).isEqualTo(createReader);
                hybridSourceReader.handleSourceEvents(new SwitchSourceEvent(1, new MockSource(null, 0) { // from class: org.apache.flink.connector.base.source.hybrid.HybridSourceReaderTest.4
                    public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
                        return createReader2;
                    }
                }, true));
                Assertions.assertThat(isAvailable3).isDone().as("switching should signal completion to poll the new reader", new Object[0]);
                CompletableFuture isAvailable4 = hybridSourceReader.isAvailable();
                Assertions.assertThat(isAvailable2).isNotSameAs(isAvailable4);
                Assertions.assertThat(currentReader(hybridSourceReader)).as("reader after switch source event", new Object[0]).isEqualTo(createReader2);
                hybridSourceReader.notifyNoMoreSplits();
                Assertions.assertThat(hybridSourceReader.pollNext(testingReaderOutput)).as("reader 1 after notifyNoMoreSplits", new Object[0]).isEqualTo(InputStatus.END_OF_INPUT);
                Assertions.assertThat(isAvailable4).isSameAs(hybridSourceReader.isAvailable()).as("future should not have been refreshed", new Object[0]);
                hybridSourceReader.close();
                return;
            }
            pollNext = hybridSourceReader.pollNext(testingReaderOutput);
            Thread.sleep(10L);
        }
    }

    @Test
    public void testReaderRecovery() throws Exception {
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        new TestingReaderOutput();
        MockBaseSource mockBaseSource = new MockBaseSource(1, 1, Boundedness.BOUNDED);
        HybridSourceReader hybridSourceReader = new HybridSourceReader(testingReaderContext);
        hybridSourceReader.start();
        assertAndClearSourceReaderFinishedEvent(testingReaderContext, -1);
        hybridSourceReader.handleSourceEvents(new SwitchSourceEvent(0, mockBaseSource, false));
        MockSourceSplit mockSourceSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE);
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(0, mockBaseSource);
        HybridSourceSplit wrapSplit = HybridSourceSplit.wrapSplit(mockSourceSplit, 0, switchedSources);
        hybridSourceReader.addSplits(Collections.singletonList(wrapSplit));
        List snapshotState = hybridSourceReader.snapshotState(0L);
        Assertions.assertThat(snapshotState).contains(new HybridSourceSplit[]{wrapSplit});
        testingReaderContext.clearSentEvents();
        HybridSourceReader hybridSourceReader2 = new HybridSourceReader(testingReaderContext);
        hybridSourceReader2.addSplits(snapshotState);
        Assertions.assertThat(currentReader(hybridSourceReader2)).isNull();
        hybridSourceReader2.start();
        Assertions.assertThat(currentReader(hybridSourceReader2)).isNull();
        assertAndClearSourceReaderFinishedEvent(testingReaderContext, -1);
        hybridSourceReader2.handleSourceEvents(new SwitchSourceEvent(0, mockBaseSource, false));
        Assertions.assertThat(currentReader(hybridSourceReader2)).isNotNull();
        Assertions.assertThat(hybridSourceReader2.snapshotState(1L)).contains(new HybridSourceSplit[]{wrapSplit});
        hybridSourceReader2.close();
    }

    @Test
    public void testDefaultMethodDelegation() throws Exception {
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        new TestingReaderOutput();
        MockBaseSource mockBaseSource = new MockBaseSource(1, 1, Boundedness.BOUNDED) { // from class: org.apache.flink.connector.base.source.hybrid.HybridSourceReaderTest.5
            @Override // org.apache.flink.connector.base.source.reader.mocks.MockBaseSource
            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
                return (SourceReader) Mockito.spy(super.createReader(sourceReaderContext));
            }
        };
        HybridSourceReader hybridSourceReader = new HybridSourceReader(testingReaderContext);
        hybridSourceReader.start();
        assertAndClearSourceReaderFinishedEvent(testingReaderContext, -1);
        hybridSourceReader.handleSourceEvents(new SwitchSourceEvent(0, mockBaseSource, false));
        SourceReader<Integer, MockSourceSplit> currentReader = currentReader(hybridSourceReader);
        hybridSourceReader.notifyCheckpointComplete(1L);
        ((SourceReader) Mockito.verify(currentReader)).notifyCheckpointComplete(1L);
        hybridSourceReader.notifyCheckpointAborted(1L);
        ((SourceReader) Mockito.verify(currentReader)).notifyCheckpointAborted(1L);
        hybridSourceReader.close();
    }

    private static SourceReader<Integer, MockSourceSplit> currentReader(HybridSourceReader<?> hybridSourceReader) {
        return (SourceReader) Whitebox.getInternalState(hybridSourceReader, "currentReader");
    }

    private static void assertAndClearSourceReaderFinishedEvent(TestingReaderContext testingReaderContext, int i) {
        Assertions.assertThat(testingReaderContext.getSentEvents()).hasSize(1);
        Assertions.assertThat(((SourceReaderFinishedEvent) testingReaderContext.getSentEvents().get(0)).sourceIndex()).isEqualTo(i);
        testingReaderContext.clearSentEvents();
    }
}
