package org.apache.flink.connector.base.source.reader;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.class */
class CoordinatedSourceITCase extends AbstractTestBase {

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase$OnceFailingToCreateEnumeratorSource.class */
    private static class OnceFailingToCreateEnumeratorSource extends MockBaseSource {
        private static final long serialVersionUID = 1;
        private static boolean hasFailed;

        OnceFailingToCreateEnumeratorSource(int i, int i2, Boundedness boundedness) {
            super(i, i2, boundedness);
        }

        @Override // org.apache.flink.connector.base.source.reader.mocks.MockBaseSource
        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext) {
            if (hasFailed) {
                return super.createEnumerator(splitEnumeratorContext);
            }
            hasFailed = true;
            throw new FlinkRuntimeException("Test Failure");
        }

        static void reset() {
            hasFailed = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase$OnceFailingToRestoreEnumeratorSource.class */
    private static class OnceFailingToRestoreEnumeratorSource extends MockBaseSource {
        private static final long serialVersionUID = 1;
        private static boolean hasFailed;

        /* loaded from: input_file:org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase$OnceFailingToRestoreEnumeratorSource$NonAssigningEnumerator.class */
        private static class NonAssigningEnumerator extends MockSplitEnumerator {
            private final SplitEnumeratorContext<?> context;

            NonAssigningEnumerator(List<MockSourceSplit> list, SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext) {
                super(list, splitEnumeratorContext);
                this.context = splitEnumeratorContext;
            }

            @Override // org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator
            public void addReader(int i) {
            }

            @Override // org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator
            public void handleSplitRequest(int i, @Nullable String str) {
            }

            public void notifyCheckpointComplete(long j) throws Exception {
                this.context.callAsync(() -> {
                    return null;
                }, (obj, th) -> {
                    throw new FlinkRuntimeException("Artificial trigger for Global Failover");
                });
            }
        }

        OnceFailingToRestoreEnumeratorSource(int i, int i2, Boundedness boundedness) {
            super(i, i2, boundedness);
        }

        @Override // org.apache.flink.connector.base.source.reader.mocks.MockBaseSource
        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext) {
            SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator = super.createEnumerator(splitEnumeratorContext);
            if (hasFailed) {
                return createEnumerator;
            }
            try {
                return new NonAssigningEnumerator((List) createEnumerator.snapshotState(serialVersionUID), splitEnumeratorContext);
            } catch (Exception e) {
                throw new FlinkRuntimeException(e.getMessage(), e);
            }
        }

        @Override // org.apache.flink.connector.base.source.reader.mocks.MockBaseSource
        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> splitEnumeratorContext, List<MockSourceSplit> list) throws IOException {
            if (hasFailed) {
                return super.restoreEnumerator(splitEnumeratorContext, list);
            }
            hasFailed = true;
            throw new FlinkRuntimeException("Test Failure");
        }

        static void reset() {
            hasFailed = false;
        }

        @Override // org.apache.flink.connector.base.source.reader.mocks.MockBaseSource
        public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
            return restoreEnumerator((SplitEnumeratorContext<MockSourceSplit>) splitEnumeratorContext, (List<MockSourceSplit>) obj);
        }
    }

    CoordinatedSourceITCase() {
    }

    @Test
    void testEnumeratorReaderCommunication() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executeAndVerify(executionEnvironment, executionEnvironment.fromSource(new MockBaseSource(2, 10, Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "TestingSource"), 20);
    }

    @Test
    void testMultipleSources() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executeAndVerify(executionEnvironment, executionEnvironment.fromSource(new MockBaseSource(2, 10, Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "TestingSource1").union(new DataStream[]{executionEnvironment.fromSource(new MockBaseSource(2, 10, 20, Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "TestingSource2")}), 40);
    }

    @Test
    void testEnumeratorCreationFails() throws Exception {
        OnceFailingToCreateEnumeratorSource.reset();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        RestartStrategyUtils.configureFixedDelayRestartStrategy(executionEnvironment, Integer.MAX_VALUE, 0L);
        executeAndVerify(executionEnvironment, executionEnvironment.fromSource(new OnceFailingToCreateEnumeratorSource(2, 10, Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "TestingSource"), 20);
    }

    @Test
    void testEnumeratorRestoreFails() throws Exception {
        OnceFailingToRestoreEnumeratorSource.reset();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        RestartStrategyUtils.configureFixedDelayRestartStrategy(executionEnvironment, Integer.MAX_VALUE, 0L);
        executionEnvironment.enableCheckpointing(10L);
        executeAndVerify(executionEnvironment, executionEnvironment.fromSource(new OnceFailingToRestoreEnumeratorSource(2, 10, Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "TestingSource"), 20);
    }

    private void executeAndVerify(StreamExecutionEnvironment streamExecutionEnvironment, DataStream<Integer> dataStream, int i) throws Exception {
        dataStream.addSink(new RichSinkFunction<Integer>() { // from class: org.apache.flink.connector.base.source.reader.CoordinatedSourceITCase.1
            public void open(OpenContext openContext) throws Exception {
                getRuntimeContext().addAccumulator("result", new ListAccumulator());
            }

            public void invoke(Integer num, SinkFunction.Context context) throws Exception {
                getRuntimeContext().getAccumulator("result").add(num);
            }
        });
        List list = (List) streamExecutionEnvironment.execute().getAccumulatorResult("result");
        Collections.sort(list);
        Assertions.assertThat(list).hasSize(i);
        Assertions.assertThat(((Integer) list.get(0)).intValue()).isEqualTo(0);
        Assertions.assertThat(((Integer) list.get(list.size() - 1)).intValue()).isEqualTo(i - 1);
    }
}
