/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.fetcher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SplitFetcherTest {
    SplitFetcherTest() {
    }

    @Test
    void testNewFetcherIsIdle() {
        SplitFetcher fetcher = SplitFetcherTest.createFetcher(new TestingSplitReader(new RecordsWithSplitIds[0]));
        Assertions.assertThat((boolean)fetcher.isIdle()).isTrue();
    }

    @Test
    void testFetcherNotIdleAfterSplitAdded() {
        SplitFetcher fetcher = SplitFetcherTest.createFetcher(new TestingSplitReader(new RecordsWithSplitIds[0]));
        TestingSourceSplit split = new TestingSourceSplit("test-split");
        fetcher.addSplits(Collections.singletonList(split));
        Assertions.assertThat((boolean)fetcher.isIdle()).isFalse();
        while (fetcher.assignedSplits().isEmpty()) {
            fetcher.runOnce();
            Assertions.assertThat((boolean)fetcher.isIdle()).isFalse();
        }
    }

    @Test
    void testIdleAfterFinishedSplitsEnqueued() {
        SplitFetcher fetcher = SplitFetcherTest.createFetcherWithSplit("test-split", new TestingSplitReader(new RecordsWithSplitIds[]{SplitFetcherTest.finishedSplitFetch("test-split")}));
        fetcher.runOnce();
        Assertions.assertThat((Map)fetcher.assignedSplits()).isEmpty();
        Assertions.assertThat((boolean)fetcher.isIdle()).isTrue();
    }

    @Test
    void testNotifiesWhenGoingIdle() {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        SplitFetcher fetcher = SplitFetcherTest.createFetcherWithSplit("test-split", queue, new TestingSplitReader(new RecordsWithSplitIds[]{SplitFetcherTest.finishedSplitFetch("test-split")}));
        fetcher.runOnce();
        Assertions.assertThat((Map)fetcher.assignedSplits()).isEmpty();
        Assertions.assertThat((boolean)fetcher.isIdle()).isTrue();
        Assertions.assertThat((boolean)queue.getAvailabilityFuture().isDone()).isTrue();
    }

    @Test
    void testNotifiesOlderFutureWhenGoingIdle() {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        SplitFetcher fetcher = SplitFetcherTest.createFetcherWithSplit("test-split", queue, new TestingSplitReader(new RecordsWithSplitIds[]{SplitFetcherTest.finishedSplitFetch("test-split")}));
        CompletableFuture future = queue.getAvailabilityFuture();
        fetcher.runOnce();
        Assertions.assertThat((Map)fetcher.assignedSplits()).isEmpty();
        Assertions.assertThat((boolean)fetcher.isIdle()).isTrue();
        Assertions.assertThat((boolean)future.isDone()).isTrue();
    }

    @Test
    void testNotifiesWhenGoingIdleConcurrent() throws Exception {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        SplitFetcher fetcher = SplitFetcherTest.createFetcherWithSplit("test-split", queue, new TestingSplitReader(new RecordsWithSplitIds[]{SplitFetcherTest.finishedSplitFetch("test-split")}));
        QueueDrainerThread queueDrainer = new QueueDrainerThread(queue, fetcher, 1);
        queueDrainer.start();
        fetcher.runOnce();
        queueDrainer.sync();
        Assertions.assertThat((queue.getAvailabilityFuture().isDone() || queueDrainer.wasIdleWhenFinished() ? 1 : 0) != 0).isTrue();
    }

    @Test
    void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        SplitFetcher fetcher = SplitFetcherTest.createFetcherWithSplit("test-split", queue, new TestingSplitReader(new RecordsWithSplitIds[]{SplitFetcherTest.finishedSplitFetch("test-split")}));
        QueueDrainerThread queueDrainer = new QueueDrainerThread(queue, fetcher, 1);
        queueDrainer.start();
        CompletableFuture future = queue.getAvailabilityFuture();
        fetcher.runOnce();
        Assertions.assertThat((boolean)future.isDone()).isTrue();
        queueDrainer.sync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testWakeup() throws InterruptedException {
        int numSplits = 3;
        int numRecordsPerSplit = 10000;
        int wakeupRecordsInterval = 10;
        int numTotalRecords = 30000;
        FutureCompletingBlockingQueue elementQueue = new FutureCompletingBlockingQueue(1);
        final SplitFetcher fetcher = new SplitFetcher(0, elementQueue, (SplitReader)MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setBlockingFetch(true).build(), ExceptionUtils::rethrow, () -> {}, ignore -> {}, false);
        ArrayList<MockSourceSplit> splits = new ArrayList<MockSourceSplit>();
        for (int i = 0; i < 3; ++i) {
            int base;
            splits.add(new MockSourceSplit(i, 0, 10000));
            for (int j = base = i * 10000; j < base + 10000; ++j) {
                ((MockSourceSplit)splits.get(splits.size() - 1)).addRecord(j);
            }
        }
        fetcher.addSplits(splits);
        Thread fetcherThread = new Thread((Runnable)fetcher, "FetcherThread");
        final SortedSet<Integer> recordsRead = Collections.synchronizedSortedSet(new TreeSet());
        final AtomicInteger wakeupTimes = new AtomicInteger(0);
        final AtomicBoolean stop = new AtomicBoolean(false);
        Thread wakeUpCaller = new Thread("Wakeup Caller"){

            @Override
            public void run() {
                int lastWakeup = 0;
                while (recordsRead.size() < 30000 && !stop.get()) {
                    int numRecordsRead = recordsRead.size();
                    if (numRecordsRead < lastWakeup + 10) continue;
                    fetcher.wakeUp(false);
                    wakeupTimes.incrementAndGet();
                    lastWakeup = numRecordsRead;
                }
            }
        };
        try {
            fetcherThread.start();
            wakeUpCaller.start();
            while (recordsRead.size() < 30000) {
                RecordsWithSplitIds nextBatch = (RecordsWithSplitIds)elementQueue.take();
                while (nextBatch.nextSplit() != null) {
                    int[] arr;
                    while ((arr = (int[])nextBatch.nextRecordFromSplit()) != null) {
                        Assertions.assertThat((boolean)recordsRead.add(arr[0])).isTrue();
                    }
                }
            }
            Assertions.assertThat(recordsRead).hasSize(30000);
            Assertions.assertThat((Integer)((Integer)recordsRead.first())).isEqualTo(0);
            Assertions.assertThat((Integer)((Integer)recordsRead.last())).isEqualTo(29999);
            Assertions.assertThat((int)wakeupTimes.get()).isGreaterThan(0);
        }
        finally {
            stop.set(true);
            fetcher.shutdown();
            fetcherThread.join();
            wakeUpCaller.join();
        }
    }

    @Test
    void testClose() {
        TestingSplitReader splitReader = new TestingSplitReader(new RecordsWithSplitIds[0]);
        SplitFetcher fetcher = SplitFetcherTest.createFetcher(splitReader);
        fetcher.shutdown();
        fetcher.run();
        Assertions.assertThat((boolean)splitReader.isClosed()).isTrue();
    }

    @Test
    void testCloseAfterPause() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        SplitFetcher fetcher = SplitFetcherTest.createFetcherWithSplit("test-split", queue, new TestingSplitReader(new RecordsWithSplitIds[]{SplitFetcherTest.finishedSplitFetch("test-split")}));
        fetcher.pause();
        Thread fetcherThread = new Thread(() -> fetcher.shutdown());
        fetcherThread.start();
        fetcherThread.join();
        Assertions.assertThat((boolean)fetcher.runOnce()).isFalse();
    }

    @Test
    void testShutdownWaitingForRecordsProcessing() throws Exception {
        final TestingSplitReader splitReader = new TestingSplitReader(new RecordsWithSplitIds[0]);
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        final SplitFetcher fetcher = SplitFetcherTest.createFetcher(splitReader, queue);
        fetcher.shutdown(true);
        CheckedThread fetcherThread = new CheckedThread(){

            public void go() throws Exception {
                fetcher.run();
                Assertions.assertThat((boolean)splitReader.isClosed()).isTrue();
            }
        };
        fetcherThread.start();
        TestUtils.waitUntil(() -> fetcherThread.getState() == Thread.State.WAITING, (Duration)Duration.ofSeconds(1L), (String)"The fetcher thread should be waiting for the shutdown latch");
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)splitReader.isClosed()).as("The split reader should have not been closed.", new Object[0])).isFalse();
        queue.getAvailabilityFuture().thenRun(() -> ((RecordsWithSplitIds)queue.poll()).recycle());
        fetcherThread.sync();
    }

    private static <E> RecordsBySplits<E> finishedSplitFetch(String splitId) {
        return new RecordsBySplits(Collections.emptyMap(), Collections.singleton(splitId));
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(SplitReader<E, TestingSourceSplit> reader) {
        return SplitFetcherTest.createFetcher(reader, new FutureCompletingBlockingQueue());
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(SplitReader<E, TestingSourceSplit> reader, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) {
        return new SplitFetcher(0, queue, reader, ExceptionUtils::rethrow, () -> {}, ignore -> {}, false);
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(String splitId, SplitReader<E, TestingSourceSplit> reader) {
        return SplitFetcherTest.createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue(), reader);
    }

    private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(String splitId, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue, SplitReader<E, TestingSourceSplit> reader) {
        SplitFetcher<E, TestingSourceSplit> fetcher = SplitFetcherTest.createFetcher(reader, queue);
        fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
        while (fetcher.assignedSplits().isEmpty()) {
            fetcher.runOnce();
        }
        return fetcher;
    }

    private static final class QueueDrainerThread
    extends CheckedThread {
        private final FutureCompletingBlockingQueue<?> queue;
        private final SplitFetcher<?, ?> fetcher;
        private final int numFetchesToTake;
        private volatile boolean wasIdleWhenFinished;

        QueueDrainerThread(FutureCompletingBlockingQueue<?> queue, SplitFetcher<?, ?> fetcher, int numFetchesToTake) {
            super("Queue Drainer");
            this.setPriority(10);
            this.queue = queue;
            this.fetcher = fetcher;
            this.numFetchesToTake = numFetchesToTake;
        }

        public void go() throws Exception {
            for (int remaining = this.numFetchesToTake; remaining > 0; --remaining) {
                this.queue.take();
            }
            this.wasIdleWhenFinished = this.fetcher.isIdle();
        }

        public boolean wasIdleWhenFinished() {
            return this.wasIdleWhenFinished;
        }
    }
}

