package org.apache.flink.connector.testutils.source.reader;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.class */
public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends TestLogger {
    protected final int numSplits = getNumSplits();
    protected final int totalNumRecords = this.numSplits * NUM_RECORDS_PER_SPLIT;
    protected static final int NUM_RECORDS_PER_SPLIT = 10;

    /* loaded from: input_file:org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase$ValidatingSourceOutput.class */
    public class ValidatingSourceOutput implements ReaderOutput<Integer> {
        private Set<Integer> consumedValues = new HashSet();
        private int max = Integer.MIN_VALUE;
        private int min = Integer.MAX_VALUE;
        private int count = 0;

        public ValidatingSourceOutput() {
        }

        public void collect(Integer num) {
            this.max = Math.max(num.intValue(), this.max);
            this.min = Math.min(num.intValue(), this.min);
            this.count++;
            this.consumedValues.add(num);
        }

        public void collect(Integer num, long j) {
            collect(num);
        }

        public void validate() {
            Assertions.assertThat(this.consumedValues).as("Should be %d distinct elements in total", new Object[]{Integer.valueOf(SourceReaderTestBase.this.totalNumRecords)}).hasSize(SourceReaderTestBase.this.totalNumRecords);
            Assertions.assertThat(this.count).as("Should be %d elements in total", new Object[]{Integer.valueOf(SourceReaderTestBase.this.totalNumRecords)}).isEqualTo(SourceReaderTestBase.this.totalNumRecords);
            Assertions.assertThat(this.min).as("The min value should be 0", new Object[]{Integer.valueOf(SourceReaderTestBase.this.totalNumRecords)}).isZero();
            Assertions.assertThat(this.max).as("The max value should be %d", new Object[]{Integer.valueOf(SourceReaderTestBase.this.totalNumRecords - 1)}).isEqualTo(SourceReaderTestBase.this.totalNumRecords - 1);
        }

        public int count() {
            return this.count;
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public void markActive() {
        }

        public SourceOutput<Integer> createOutputForSplit(String str) {
            return this;
        }

        public void releaseOutputForSplit(String str) {
        }
    }

    protected int getNumSplits() {
        return NUM_RECORDS_PER_SPLIT;
    }

    @AfterEach
    public void ensureNoDangling() {
        Iterator<Thread> it = Thread.getAllStackTraces().keySet().iterator();
        while (it.hasNext()) {
            if (it.next().getName().equals("SourceFetcher")) {
                System.out.println("Dangling thread.");
            }
        }
    }

    @Test
    void testRead() throws Exception {
        SourceReader<Integer, SplitT> createReader = createReader();
        try {
            createReader.addSplits(getSplits(this.numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
            ValidatingSourceOutput validatingSourceOutput = new ValidatingSourceOutput();
            while (validatingSourceOutput.count < this.totalNumRecords) {
                createReader.pollNext(validatingSourceOutput);
            }
            validatingSourceOutput.validate();
            if (createReader != null) {
                createReader.close();
            }
        } catch (Throwable th) {
            if (createReader != null) {
                try {
                    createReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testAddSplitToExistingFetcher() throws Exception {
        Thread.sleep(10L);
        SourceReaderTestBase<SplitT>.ValidatingSourceOutput validatingSourceOutput = new ValidatingSourceOutput();
        SourceReader<Integer, SplitT> consumeRecords = consumeRecords(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)), validatingSourceOutput, 5);
        try {
            ArrayList arrayList = new ArrayList();
            for (int i = 1; i < this.numSplits; i++) {
                arrayList.add(getSplit(i, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
            }
            consumeRecords.addSplits(arrayList);
            while (validatingSourceOutput.count() < NUM_RECORDS_PER_SPLIT * this.numSplits) {
                consumeRecords.pollNext(validatingSourceOutput);
            }
            validatingSourceOutput.validate();
            if (consumeRecords != null) {
                consumeRecords.close();
            }
        } catch (Throwable th) {
            if (consumeRecords != null) {
                try {
                    consumeRecords.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Timeout(30)
    @Test
    void testPollingFromEmptyQueue() throws Exception {
        SourceReaderTestBase<SplitT>.ValidatingSourceOutput validatingSourceOutput = new ValidatingSourceOutput();
        SourceReader<Integer, SplitT> consumeRecords = consumeRecords(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)), validatingSourceOutput, NUM_RECORDS_PER_SPLIT);
        try {
            Assertions.assertThat(consumeRecords.pollNext(validatingSourceOutput)).as("The status should be %s", new Object[]{InputStatus.NOTHING_AVAILABLE}).isEqualTo(InputStatus.NOTHING_AVAILABLE);
            if (consumeRecords != null) {
                consumeRecords.close();
            }
        } catch (Throwable th) {
            if (consumeRecords != null) {
                try {
                    consumeRecords.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Timeout(30)
    @Test
    void testAvailableOnEmptyQueue() throws Exception {
        SourceReader<Integer, SplitT> createReader = createReader();
        try {
            CompletableFuture isAvailable = createReader.isAvailable();
            ((AbstractBooleanAssert) Assertions.assertThat(isAvailable.isDone()).as("There should be no records ready for poll.", new Object[0])).isFalse();
            createReader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)));
            isAvailable.get();
            if (createReader != null) {
                createReader.close();
            }
        } catch (Throwable th) {
            if (createReader != null) {
                try {
                    createReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Timeout(30)
    @Test
    void testSnapshot() throws Exception {
        SourceReader consumeRecords = consumeRecords(getSplits(this.numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED), new ValidatingSourceOutput(), this.totalNumRecords);
        try {
            List snapshotState = consumeRecords.snapshotState(1L);
            Assertions.assertThat(snapshotState).as("The snapshot should only have 10 splits. ", new Object[0]).hasSize(this.numSplits);
            for (int i = 0; i < this.numSplits; i++) {
                Assertions.assertThat(getNextRecordIndex((SourceSplit) snapshotState.get(i))).as("The first four splits should have been fully consumed.", new Object[0]).isEqualTo(10L);
            }
            if (consumeRecords != null) {
                consumeRecords.close();
            }
        } catch (Throwable th) {
            if (consumeRecords != null) {
                try {
                    consumeRecords.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected abstract SourceReader<Integer, SplitT> createReader() throws Exception;

    protected abstract List<SplitT> getSplits(int i, int i2, Boundedness boundedness);

    protected abstract SplitT getSplit(int i, int i2, Boundedness boundedness);

    protected abstract long getNextRecordIndex(SplitT splitt);

    private SourceReader<Integer, SplitT> consumeRecords(List<SplitT> list, SourceReaderTestBase<SplitT>.ValidatingSourceOutput validatingSourceOutput, int i) throws Exception {
        SourceReader<Integer, SplitT> createReader = createReader();
        createReader.addSplits(list);
        while (validatingSourceOutput.count() < i) {
            createReader.pollNext(validatingSourceOutput);
        }
        return createReader;
    }
}
