/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testutils.source.reader;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public abstract class SourceReaderTestBase<SplitT extends SourceSplit>
extends TestLogger {
    protected final int numSplits = this.getNumSplits();
    protected final int totalNumRecords = this.numSplits * 10;
    protected static final int NUM_RECORDS_PER_SPLIT = 10;

    protected int getNumSplits() {
        return 10;
    }

    @AfterEach
    public void ensureNoDangling() {
        for (Thread t : Thread.getAllStackTraces().keySet()) {
            if (!t.getName().equals("SourceFetcher")) continue;
            System.out.println("Dangling thread.");
        }
    }

    @Test
    void testRead() throws Exception {
        try (SourceReader<Integer, SplitT> reader = this.createReader();){
            reader.addSplits(this.getSplits(this.numSplits, 10, Boundedness.BOUNDED));
            ValidatingSourceOutput output = new ValidatingSourceOutput();
            while (output.count < this.totalNumRecords) {
                reader.pollNext((ReaderOutput)output);
            }
            output.validate();
        }
    }

    @Test
    void testAddSplitToExistingFetcher() throws Exception {
        Thread.sleep(10L);
        ValidatingSourceOutput output = new ValidatingSourceOutput();
        List<SplitT> splits = Collections.singletonList(this.getSplit(0, 10, Boundedness.BOUNDED));
        try (SourceReader<Integer, SplitT> reader = this.consumeRecords(splits, output, 5);){
            ArrayList<SplitT> newSplits = new ArrayList<SplitT>();
            for (int i = 1; i < this.numSplits; ++i) {
                newSplits.add(this.getSplit(i, 10, Boundedness.BOUNDED));
            }
            reader.addSplits(newSplits);
            while (output.count() < 10 * this.numSplits) {
                reader.pollNext((ReaderOutput)output);
            }
            output.validate();
        }
    }

    @Test
    @Timeout(value=30L)
    void testPollingFromEmptyQueue() throws Exception {
        ValidatingSourceOutput output = new ValidatingSourceOutput();
        List<SplitT> splits = Collections.singletonList(this.getSplit(0, 10, Boundedness.BOUNDED));
        try (SourceReader<Integer, SplitT> reader = this.consumeRecords(splits, output, 10);){
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)reader.pollNext((ReaderOutput)output)).as("The status should be %s", new Object[]{InputStatus.NOTHING_AVAILABLE})).isEqualTo((Object)InputStatus.NOTHING_AVAILABLE);
        }
    }

    @Test
    @Timeout(value=30L)
    void testAvailableOnEmptyQueue() throws Exception {
        try (SourceReader<Integer, SplitT> reader = this.createReader();){
            CompletableFuture future = reader.isAvailable();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)future.isDone()).as("There should be no records ready for poll.", new Object[0])).isFalse();
            reader.addSplits(Collections.singletonList(this.getSplit(0, 10, Boundedness.BOUNDED)));
            future.get();
        }
    }

    @Test
    @Timeout(value=30L)
    void testSnapshot() throws Exception {
        ValidatingSourceOutput output = new ValidatingSourceOutput();
        List<SplitT> splits = this.getSplits(this.numSplits, 10, Boundedness.CONTINUOUS_UNBOUNDED);
        try (SourceReader<Integer, SplitT> reader = this.consumeRecords(splits, output, this.totalNumRecords);){
            List state = reader.snapshotState(1L);
            ((ListAssert)Assertions.assertThat((List)state).as("The snapshot should only have 10 splits. ", new Object[0])).hasSize(this.numSplits);
            for (int i = 0; i < this.numSplits; ++i) {
                ((AbstractLongAssert)Assertions.assertThat((long)this.getNextRecordIndex((SourceSplit)state.get(i))).as("The first four splits should have been fully consumed.", new Object[0])).isEqualTo(10L);
            }
        }
    }

    protected abstract SourceReader<Integer, SplitT> createReader() throws Exception;

    protected abstract List<SplitT> getSplits(int var1, int var2, Boundedness var3);

    protected abstract SplitT getSplit(int var1, int var2, Boundedness var3);

    protected abstract long getNextRecordIndex(SplitT var1);

    private SourceReader<Integer, SplitT> consumeRecords(List<SplitT> splits, ValidatingSourceOutput output, int n) throws Exception {
        SourceReader<Integer, SplitT> reader = this.createReader();
        reader.addSplits(splits);
        while (output.count() < n) {
            reader.pollNext((ReaderOutput)output);
        }
        return reader;
    }

    public class ValidatingSourceOutput
    implements ReaderOutput<Integer> {
        private Set<Integer> consumedValues = new HashSet<Integer>();
        private int max = Integer.MIN_VALUE;
        private int min = Integer.MAX_VALUE;
        private int count = 0;

        public void collect(Integer element) {
            this.max = Math.max(element, this.max);
            this.min = Math.min(element, this.min);
            ++this.count;
            this.consumedValues.add(element);
        }

        public void collect(Integer element, long timestamp) {
            this.collect(element);
        }

        public void validate() {
            ((AbstractCollectionAssert)Assertions.assertThat(this.consumedValues).as("Should be %d distinct elements in total", new Object[]{SourceReaderTestBase.this.totalNumRecords})).hasSize(SourceReaderTestBase.this.totalNumRecords);
            ((AbstractIntegerAssert)Assertions.assertThat((int)this.count).as("Should be %d elements in total", new Object[]{SourceReaderTestBase.this.totalNumRecords})).isEqualTo(SourceReaderTestBase.this.totalNumRecords);
            ((AbstractIntegerAssert)Assertions.assertThat((int)this.min).as("The min value should be 0", new Object[]{SourceReaderTestBase.this.totalNumRecords})).isZero();
            ((AbstractIntegerAssert)Assertions.assertThat((int)this.max).as("The max value should be %d", new Object[]{SourceReaderTestBase.this.totalNumRecords - 1})).isEqualTo(SourceReaderTestBase.this.totalNumRecords - 1);
        }

        public int count() {
            return this.count;
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public void markActive() {
        }

        public SourceOutput<Integer> createOutputForSplit(String splitId) {
            return this;
        }

        public void releaseOutputForSplit(String splitId) {
        }
    }
}

