/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.hybrid;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.hybrid.HybridSourceReader;
import org.apache.flink.connector.base.source.hybrid.HybridSourceSplit;
import org.apache.flink.connector.base.source.hybrid.SourceReaderFinishedEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchSourceEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchedSources;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.mock.Whitebox;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class HybridSourceReaderTest {
    HybridSourceReaderTest() {
    }

    @Test
    void testReader() throws Exception {
        TestingReaderContext readerContext = new TestingReaderContext();
        TestingReaderOutput readerOutput = new TestingReaderOutput();
        MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);
        final SourceReader<Integer, MockSourceSplit> mockSplitReader1 = source.createReader((SourceReaderContext)readerContext);
        final SourceReader<Integer, MockSourceSplit> mockSplitReader2 = source.createReader((SourceReaderContext)readerContext);
        HybridSourceReader reader = new HybridSourceReader((SourceReaderContext)readerContext);
        Assertions.assertThat((List)readerContext.getSentEvents()).isEmpty();
        reader.start();
        HybridSourceReaderTest.assertAndClearSourceReaderFinishedEvent(readerContext, -1);
        Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).isNull();
        Assertions.assertThat((Comparable)reader.pollNext((ReaderOutput)readerOutput)).isEqualTo((Object)InputStatus.NOTHING_AVAILABLE);
        MockSource source1 = new MockSource(null, 0){

            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
                return mockSplitReader1;
            }
        };
        reader.handleSourceEvents((SourceEvent)new SwitchSourceEvent(0, (Source)source1, false));
        MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 1);
        mockSplit.addRecord(0);
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(0, (Source)source);
        HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit((SourceSplit)mockSplit, (int)0, (SwitchedSources)switchedSources);
        reader.addSplits(Collections.singletonList(hybridSplit));
        InputStatus status = reader.pollNext((ReaderOutput)readerOutput);
        while (readerOutput.getEmittedRecords().isEmpty() || status == InputStatus.MORE_AVAILABLE) {
            status = reader.pollNext((ReaderOutput)readerOutput);
            Thread.sleep(10L);
        }
        Assertions.assertThat((List)readerOutput.getEmittedRecords()).contains((Object[])new Integer[]{0});
        reader.pollNext((ReaderOutput)readerOutput);
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)reader.pollNext((ReaderOutput)readerOutput)).as("before notifyNoMoreSplits", new Object[0])).isEqualTo((Object)InputStatus.NOTHING_AVAILABLE);
        reader.notifyNoMoreSplits();
        reader.pollNext((ReaderOutput)readerOutput);
        HybridSourceReaderTest.assertAndClearSourceReaderFinishedEvent(readerContext, 0);
        ((ObjectAssert)Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).as("reader before switch source event", new Object[0])).isEqualTo(mockSplitReader1);
        MockSource source2 = new MockSource(null, 0){

            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
                return mockSplitReader2;
            }
        };
        reader.handleSourceEvents((SourceEvent)new SwitchSourceEvent(1, (Source)source2, true));
        ((ObjectAssert)Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).as("reader after switch source event", new Object[0])).isEqualTo(mockSplitReader2);
        reader.notifyNoMoreSplits();
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)reader.pollNext((ReaderOutput)readerOutput)).as("reader 1 after notifyNoMoreSplits", new Object[0])).isEqualTo((Object)InputStatus.END_OF_INPUT);
        reader.close();
    }

    @Test
    void testAvailabilityFutureSwitchover() throws Exception {
        TestingReaderContext readerContext = new TestingReaderContext();
        TestingReaderOutput readerOutput = new TestingReaderOutput();
        MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);
        final MutableFutureSourceReader mockSplitReader1 = MutableFutureSourceReader.createReader((SourceReaderContext)readerContext);
        final MutableFutureSourceReader mockSplitReader2 = MutableFutureSourceReader.createReader((SourceReaderContext)readerContext);
        HybridSourceReader reader = new HybridSourceReader((SourceReaderContext)readerContext);
        Assertions.assertThat((List)readerContext.getSentEvents()).isEmpty();
        reader.start();
        HybridSourceReaderTest.assertAndClearSourceReaderFinishedEvent(readerContext, -1);
        Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).isNull();
        Assertions.assertThat((Comparable)reader.pollNext((ReaderOutput)readerOutput)).isEqualTo((Object)InputStatus.NOTHING_AVAILABLE);
        CompletableFuture hybridSourceFutureBeforeFirstReader = reader.isAvailable();
        Assertions.assertThat((CompletableFuture)hybridSourceFutureBeforeFirstReader).isNotDone();
        MockSource source1 = new MockSource(null, 0){

            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
                return mockSplitReader1;
            }
        };
        reader.handleSourceEvents((SourceEvent)new SwitchSourceEvent(0, (Source)source1, false));
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)hybridSourceFutureBeforeFirstReader).isDone()).as("the previous underlying future should be completed after switch event", new Object[0]);
        MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 1);
        mockSplit.addRecord(0);
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(0, (Source)source);
        HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit((SourceSplit)mockSplit, (int)0, (SwitchedSources)switchedSources);
        reader.addSplits(Collections.singletonList(hybridSplit));
        CompletableFuture futureBeforeDraining = reader.isAvailable();
        mockSplitReader1.completeFuture();
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)futureBeforeDraining).isDone()).as("underlying future is complete and hybrid source should poll", new Object[0]);
        InputStatus status = reader.pollNext((ReaderOutput)readerOutput);
        while (readerOutput.getEmittedRecords().isEmpty() || status == InputStatus.MORE_AVAILABLE) {
            status = reader.pollNext((ReaderOutput)readerOutput);
            Thread.sleep(10L);
        }
        mockSplitReader1.resetFuture();
        CompletableFuture futureAfterDraining = reader.isAvailable();
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)futureBeforeDraining).isNotEqualTo((Object)futureAfterDraining)).as("Future should have been refreshed since the previous future is complete", new Object[0]);
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)futureAfterDraining).isNotDone()).as("Future should not be complete", new Object[0]);
        Assertions.assertThat((List)readerOutput.getEmittedRecords()).contains((Object[])new Integer[]{0});
        reader.pollNext((ReaderOutput)readerOutput);
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)reader.pollNext((ReaderOutput)readerOutput)).as("before notifyNoMoreSplits", new Object[0])).isEqualTo((Object)InputStatus.NOTHING_AVAILABLE);
        reader.notifyNoMoreSplits();
        reader.pollNext((ReaderOutput)readerOutput);
        HybridSourceReaderTest.assertAndClearSourceReaderFinishedEvent(readerContext, 0);
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)futureAfterDraining).isNotDone()).as("still no more records and runtime should not poll", new Object[0]);
        ((ObjectAssert)Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).as("reader before switch source event", new Object[0])).isEqualTo((Object)mockSplitReader1);
        MockSource source2 = new MockSource(null, 0){

            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
                return mockSplitReader2;
            }
        };
        reader.handleSourceEvents((SourceEvent)new SwitchSourceEvent(1, (Source)source2, true));
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)futureAfterDraining).isDone()).as("switching should signal completion to poll the new reader", new Object[0]);
        CompletableFuture futureReader2 = reader.isAvailable();
        Assertions.assertThat((CompletableFuture)futureBeforeDraining).isNotSameAs((Object)futureReader2);
        ((ObjectAssert)Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).as("reader after switch source event", new Object[0])).isEqualTo((Object)mockSplitReader2);
        reader.notifyNoMoreSplits();
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)reader.pollNext((ReaderOutput)readerOutput)).as("reader 1 after notifyNoMoreSplits", new Object[0])).isEqualTo((Object)InputStatus.END_OF_INPUT);
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)futureReader2).isSameAs((Object)reader.isAvailable())).as("future should not have been refreshed", new Object[0]);
        reader.close();
    }

    @Test
    void testReaderRecovery() throws Exception {
        TestingReaderContext readerContext = new TestingReaderContext();
        TestingReaderOutput readerOutput = new TestingReaderOutput();
        MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);
        HybridSourceReader reader = new HybridSourceReader((SourceReaderContext)readerContext);
        reader.start();
        HybridSourceReaderTest.assertAndClearSourceReaderFinishedEvent(readerContext, -1);
        reader.handleSourceEvents((SourceEvent)new SwitchSourceEvent(0, (Source)source, false));
        MockSourceSplit mockSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE);
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(0, (Source)source);
        HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit((SourceSplit)mockSplit, (int)0, (SwitchedSources)switchedSources);
        reader.addSplits(Collections.singletonList(hybridSplit));
        List snapshot = reader.snapshotState(0L);
        Assertions.assertThat((List)snapshot).contains((Object[])new HybridSourceSplit[]{hybridSplit});
        readerContext.clearSentEvents();
        reader = new HybridSourceReader((SourceReaderContext)readerContext);
        reader.addSplits(snapshot);
        Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).isNull();
        reader.start();
        Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).isNull();
        HybridSourceReaderTest.assertAndClearSourceReaderFinishedEvent(readerContext, -1);
        reader.handleSourceEvents((SourceEvent)new SwitchSourceEvent(0, (Source)source, false));
        Assertions.assertThat(HybridSourceReaderTest.currentReader(reader)).isNotNull();
        Assertions.assertThat((List)reader.snapshotState(1L)).contains((Object[])new HybridSourceSplit[]{hybridSplit});
        reader.close();
    }

    @Test
    void testDefaultMethodDelegation() throws Exception {
        TestingReaderContext readerContext = new TestingReaderContext();
        TestingReaderOutput readerOutput = new TestingReaderOutput();
        MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED){

            @Override
            public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
                return (SourceReader)Mockito.spy(super.createReader(readerContext));
            }
        };
        HybridSourceReader reader = new HybridSourceReader((SourceReaderContext)readerContext);
        reader.start();
        HybridSourceReaderTest.assertAndClearSourceReaderFinishedEvent(readerContext, -1);
        reader.handleSourceEvents((SourceEvent)new SwitchSourceEvent(0, (Source)source, false));
        SourceReader<Integer, MockSourceSplit> underlyingReader = HybridSourceReaderTest.currentReader(reader);
        reader.notifyCheckpointComplete(1L);
        ((SourceReader)Mockito.verify(underlyingReader)).notifyCheckpointComplete(1L);
        reader.notifyCheckpointAborted(1L);
        ((SourceReader)Mockito.verify(underlyingReader)).notifyCheckpointAborted(1L);
        reader.close();
    }

    private static SourceReader<Integer, MockSourceSplit> currentReader(HybridSourceReader<?> reader) {
        return (SourceReader)Whitebox.getInternalState(reader, (String)"currentReader");
    }

    private static void assertAndClearSourceReaderFinishedEvent(TestingReaderContext context, int sourceIndex) {
        Assertions.assertThat((List)context.getSentEvents()).hasSize(1);
        Assertions.assertThat((int)((SourceReaderFinishedEvent)context.getSentEvents().get(0)).sourceIndex()).isEqualTo(sourceIndex);
        context.clearSentEvents();
    }

    private static class MutableFutureSourceReader
    extends MockSourceReader {
        private CompletableFuture<Void> availabilityFuture = new CompletableFuture();

        public MutableFutureSourceReader(Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier, Configuration config, SourceReaderContext context) {
            super(splitFetcherSupplier, config, context);
        }

        public static MutableFutureSourceReader createReader(SourceReaderContext readerContext) {
            Configuration config = new Configuration();
            config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, (Object)2);
            config.set(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, (Object)30000L);
            MockSplitReader.Builder builder = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setBlockingFetch(true);
            return new MutableFutureSourceReader(builder::build, config, readerContext);
        }

        public CompletableFuture<Void> isAvailable() {
            return this.availabilityFuture;
        }

        public void completeFuture() {
            this.availabilityFuture.complete(null);
        }

        public void resetFuture() {
            if (this.availabilityFuture.isDone()) {
                this.availabilityFuture = new CompletableFuture();
            }
        }
    }
}

