package org.apache.flink.connector.base.source.reader.fetcher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.class */
public class SplitFetcherPauseResumeSplitReaderTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest$MockSplitReaderUnsupportedPause.class */
    public static class MockSplitReaderUnsupportedPause extends MockSplitReader {

        /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest$MockSplitReaderUnsupportedPause$Builder.class */
        public static class Builder extends MockSplitReader.Builder {
            public Builder(MockSplitReader.Builder builder) {
                super(builder);
            }

            @Override // org.apache.flink.connector.base.source.reader.mocks.MockSplitReader.Builder
            public MockSplitReader build() {
                return new MockSplitReaderUnsupportedPause(this.numRecordsPerSplitPerFetch, this.separatedFinishedRecord, this.blockingFetch);
            }
        }

        public MockSplitReaderUnsupportedPause(int i, boolean z, boolean z2) {
            super(i, z, z2);
        }

        @Override // org.apache.flink.connector.base.source.reader.mocks.MockSplitReader
        public void pauseOrResumeSplits(Collection<MockSourceSplit> collection, Collection<MockSourceSplit> collection2) {
            throw new UnsupportedOperationException();
        }

        public static Builder cloneBuilder(MockSplitReader.Builder builder) {
            return new Builder(builder);
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest$MockSteppingSplitFetcherManager.class */
    private static class MockSteppingSplitFetcherManager<E, SplitT extends SourceSplit> extends SingleThreadFetcherManager<E, SplitT> {
        public MockSteppingSplitFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, Supplier<SplitReader<E, SplitT>> supplier, Configuration configuration) {
            super(futureCompletingBlockingQueue, supplier, configuration);
        }

        public void addSplits(List<SplitT> list) {
            createSplitFetcher().addSplits(list);
        }

        public void runEachOnce() {
            Iterator<E> it = this.fetchers.values().iterator();
            while (it.hasNext()) {
                ((SplitFetcher) it.next()).runOnce();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest$SteppingSourceReaderTestHarness.class */
    public static class SteppingSourceReaderTestHarness {
        private final MockSteppingSplitFetcherManager<int[], MockSourceSplit> fetcherManager;
        private final MockSourceReader sourceReader;

        public SteppingSourceReaderTestHarness(Supplier<SplitReader<int[], MockSourceSplit>> supplier, Configuration configuration) {
            FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(10);
            this.fetcherManager = new MockSteppingSplitFetcherManager<>(futureCompletingBlockingQueue, supplier, configuration);
            this.sourceReader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>) futureCompletingBlockingQueue, (SingleThreadFetcherManager<int[], MockSourceSplit>) this.fetcherManager, configuration, (SourceReaderContext) new TestingReaderContext());
        }

        private static List<MockSourceSplit> createPrefilledSplits(int i, int i2) {
            ArrayList arrayList = new ArrayList(i);
            for (int i3 = 0; i3 < i; i3++) {
                MockSourceSplit mockSourceSplit = new MockSourceSplit(i3, 0, i2);
                for (int i4 = 0; i4 < i2; i4++) {
                    mockSourceSplit.addRecord((i4 * i) + i3);
                }
                arrayList.add(mockSourceSplit);
            }
            return arrayList;
        }

        public void addPrefilledSplitsSingleReader(int i, int i2) {
            this.sourceReader.addSplits(createPrefilledSplits(i, i2));
            this.sourceReader.notifyNoMoreSplits();
        }

        public void addPrefilledSplitsIndividualReader(int i, int i2) {
            Iterator<MockSourceSplit> it = createPrefilledSplits(i, i2).iterator();
            while (it.hasNext()) {
                this.sourceReader.addSplits(Collections.singletonList(it.next()));
            }
            this.sourceReader.notifyNoMoreSplits();
        }

        public static MockSplitReader.Builder createSplitReaderBuilder() {
            return MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(1).setBlockingFetch(false).setSeparatedFinishedRecord(true);
        }

        public int runUntilRecordsEmitted(TestingReaderOutput testingReaderOutput, int i, int i2) throws Exception {
            AtomicReference atomicReference = new AtomicReference();
            AtomicInteger atomicInteger = new AtomicInteger();
            Supplier supplier = () -> {
                try {
                    this.fetcherManager.runEachOnce();
                    atomicInteger.getAndIncrement();
                    InputStatus pollNext = this.sourceReader.pollNext(testingReaderOutput);
                    while (pollNext == InputStatus.MORE_AVAILABLE) {
                        pollNext = this.sourceReader.pollNext(testingReaderOutput);
                    }
                    if (pollNext == InputStatus.END_OF_INPUT) {
                        return true;
                    }
                    if (i2 < 0) {
                        return false;
                    }
                    return Boolean.valueOf(testingReaderOutput.getEmittedRecords().size() >= i2);
                } catch (Exception e) {
                    atomicReference.set(e);
                    return true;
                }
            };
            Duration ofSeconds = Duration.ofSeconds(i);
            Object[] objArr = new Object[2];
            objArr[0] = Integer.valueOf(testingReaderOutput.getEmittedRecords().size());
            objArr[1] = i2 < 0 ? "but not all" : "out of " + i2;
            CommonTestUtils.waitUtil(supplier, ofSeconds, String.format("%d %s records fetched within timeout", objArr));
            if (atomicReference.get() != null) {
                throw ((Exception) atomicReference.get());
            }
            return atomicInteger.get();
        }

        public int runUntilAllRecordsEmitted(TestingReaderOutput testingReaderOutput, int i) throws Exception {
            return runUntilRecordsEmitted(testingReaderOutput, i, -1);
        }

        public void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
            this.sourceReader.pauseOrResumeSplits(collection, collection2);
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest(name = "Individual reader per split: {0}")
    public void testPauseResumeSplitReaders(boolean z) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        MockSplitReader.Builder createSplitReaderBuilder = SteppingSourceReaderTestHarness.createSplitReaderBuilder();
        SteppingSourceReaderTestHarness steppingSourceReaderTestHarness = new SteppingSourceReaderTestHarness(() -> {
            atomicInteger.getAndIncrement();
            return createSplitReaderBuilder.build();
        }, new Configuration());
        if (z) {
            steppingSourceReaderTestHarness.addPrefilledSplitsIndividualReader(2, 5);
            Assertions.assertThat(atomicInteger.get()).isEqualTo(2);
        } else {
            steppingSourceReaderTestHarness.addPrefilledSplitsSingleReader(2, 5);
            Assertions.assertThat(atomicInteger.get()).isEqualTo(1);
        }
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        steppingSourceReaderTestHarness.runUntilRecordsEmitted(testingReaderOutput, 10, 2);
        Assertions.assertThat(new HashSet(testingReaderOutput.getEmittedRecords())).containsExactlyInAnyOrder(new Integer[]{0, 1});
        steppingSourceReaderTestHarness.pauseOrResumeSplits(Collections.singleton("0"), Collections.emptyList());
        steppingSourceReaderTestHarness.runUntilRecordsEmitted(testingReaderOutput, 10, 5);
        Assertions.assertThat(new HashSet(testingReaderOutput.getEmittedRecords())).containsExactlyInAnyOrder(new Integer[]{0, 1, 3, 5, 7});
        steppingSourceReaderTestHarness.pauseOrResumeSplits(Collections.emptyList(), Collections.singleton("0"));
        steppingSourceReaderTestHarness.runUntilAllRecordsEmitted(testingReaderOutput, 10);
        Assertions.assertThat(new HashSet(testingReaderOutput.getEmittedRecords())).containsExactlyInAnyOrder(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "Allow unaligned source splits: {0}")
    public void testPauseResumeUnsupported(boolean z) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        Configuration configuration = new Configuration();
        configuration.setBoolean("pipeline.watermark-alignment.allow-unaligned-source-splits", z);
        MockSplitReader.Builder createSplitReaderBuilder = SteppingSourceReaderTestHarness.createSplitReaderBuilder();
        SteppingSourceReaderTestHarness steppingSourceReaderTestHarness = new SteppingSourceReaderTestHarness(() -> {
            return atomicInteger.getAndIncrement() == 0 ? MockSplitReaderUnsupportedPause.cloneBuilder(createSplitReaderBuilder).build() : createSplitReaderBuilder.build();
        }, configuration);
        steppingSourceReaderTestHarness.addPrefilledSplitsIndividualReader(2, 5);
        Assertions.assertThat(atomicInteger.get()).isEqualTo(2);
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        steppingSourceReaderTestHarness.runUntilRecordsEmitted(testingReaderOutput, 10, 2);
        Assertions.assertThat(new HashSet(testingReaderOutput.getEmittedRecords())).containsExactlyInAnyOrder(new Integer[]{0, 1});
        steppingSourceReaderTestHarness.pauseOrResumeSplits(Collections.singleton("1"), Collections.emptyList());
        steppingSourceReaderTestHarness.runUntilRecordsEmitted(testingReaderOutput, 10, 5);
        Assertions.assertThat(new HashSet(testingReaderOutput.getEmittedRecords())).containsExactlyInAnyOrder(new Integer[]{0, 1, 2, 4, 6});
        steppingSourceReaderTestHarness.pauseOrResumeSplits(Collections.singleton("0"), Collections.singleton("1"));
        if (!z) {
            Assertions.assertThatThrownBy(() -> {
                steppingSourceReaderTestHarness.runUntilAllRecordsEmitted(testingReaderOutput, 10);
            }).isInstanceOf(RuntimeException.class).hasCauseInstanceOf(UnsupportedOperationException.class);
        } else {
            steppingSourceReaderTestHarness.runUntilAllRecordsEmitted(testingReaderOutput, 10);
            Assertions.assertThat(new HashSet(testingReaderOutput.getEmittedRecords())).containsExactlyInAnyOrder(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
        }
    }
}
