package org.apache.flink.connector.testframe.testsuites;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.util.Precision;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.connector.testframe.environment.ClusterControllable;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
import org.apache.flink.connector.testframe.utils.MetricQuerier;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@ExtendWith({ConnectorTestingExtension.class, TestLoggerExtension.class, TestCaseInvocationContextProvider.class})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.class */
public abstract class SourceTestSuiteBase<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceTestSuiteBase.class);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase$CollectIteratorBuilder.class */
    public static class CollectIteratorBuilder<T> {
        private final String operatorUid;
        private final TypeSerializer<T> serializer;
        private final String accumulatorName;
        private final CheckpointConfig checkpointConfig;

        protected CollectIteratorBuilder(String str, TypeSerializer<T> typeSerializer, String str2, CheckpointConfig checkpointConfig) {
            this.operatorUid = str;
            this.serializer = typeSerializer;
            this.accumulatorName = str2;
            this.checkpointConfig = checkpointConfig;
        }

        protected CollectResultIterator<T> build(JobClient jobClient) {
            CollectResultIterator<T> collectResultIterator = new CollectResultIterator<>(this.operatorUid, this.serializer, this.accumulatorName, this.checkpointConfig, ((Duration) RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()).toMillis());
            collectResultIterator.setJobClient(jobClient);
            return collectResultIterator;
        }
    }

    @DisplayName("Test source with single split")
    @TestTemplate
    public void testSourceSingleSplit(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        TestingSourceSettings build = TestingSourceSettings.builder().setBoundedness(Boundedness.BOUNDED).setCheckpointingMode(checkpointingMode).build();
        TestEnvironmentSettings build2 = TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSourceExternalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> tryCreateSource = tryCreateSource(dataStreamSourceExternalContext, build);
        List<T> generateAndWriteTestData = generateAndWriteTestData(0, dataStreamSourceExternalContext, build);
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(build2);
        CollectIteratorBuilder<T> addCollectSink = addCollectSink(createExecutionEnvironment.fromSource(tryCreateSource, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(1));
        JobClient submitJob = submitJob(createExecutionEnvironment, "Source Single Split Test");
        CollectResultIterator<T> build3 = addCollectSink.build(submitJob);
        try {
            LOG.info("Checking test results");
            checkResultWithSemantic(build3, Collections.singletonList(generateAndWriteTestData), checkpointingMode, null);
            if (build3 != null) {
                build3.close();
            }
            CommonTestUtils.waitForJobStatus(submitJob, Collections.singletonList(JobStatus.FINISHED));
        } catch (Throwable th) {
            if (build3 != null) {
                try {
                    build3.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DisplayName("Test source with multiple splits")
    @TestTemplate
    public void testMultipleSplits(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        TestingSourceSettings build = TestingSourceSettings.builder().setBoundedness(Boundedness.BOUNDED).setCheckpointingMode(checkpointingMode).build();
        TestEnvironmentSettings build2 = TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSourceExternalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> tryCreateSource = tryCreateSource(dataStreamSourceExternalContext, build);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(generateAndWriteTestData(i, dataStreamSourceExternalContext, build));
        }
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(build2);
        CollectResultIterator<T> build3 = addCollectSink(createExecutionEnvironment.fromSource(tryCreateSource, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(4)).build(submitJob(createExecutionEnvironment, "Source Multiple Split Test"));
        try {
            LOG.info("Checking test results");
            checkResultWithSemantic(build3, arrayList, checkpointingMode, null);
            if (build3 != null) {
                build3.close();
            }
        } catch (Throwable th) {
            if (build3 != null) {
                try {
                    build3.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DisplayName("Test source restarting from a savepoint")
    @TestTemplate
    public void testSavepoint(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        restartFromSavepoint(testEnvironment, dataStreamSourceExternalContext, checkpointingMode, 4, 4, 4);
    }

    @DisplayName("Test source restarting with a higher parallelism")
    @TestTemplate
    public void testScaleUp(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        restartFromSavepoint(testEnvironment, dataStreamSourceExternalContext, checkpointingMode, 4, 2, 4);
    }

    @DisplayName("Test source restarting with a lower parallelism")
    @TestTemplate
    public void testScaleDown(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        restartFromSavepoint(testEnvironment, dataStreamSourceExternalContext, checkpointingMode, 4, 4, 2);
    }

    private void restartFromSavepoint(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode, int i, int i2, int i3) throws Exception {
        TestingSourceSettings build = TestingSourceSettings.builder().setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED).setCheckpointingMode(checkpointingMode).build();
        TestEnvironmentSettings build2 = TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSourceExternalContext.getConnectorJarPaths()).build();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i4 = 0; i4 < i; i4++) {
            arrayList.add(dataStreamSourceExternalContext.createSourceSplitDataWriter(build));
            arrayList2.add(generateTestDataForWriter(dataStreamSourceExternalContext, build, i4, (ExternalSystemSplitDataWriter) arrayList.get(i4)));
        }
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(build2);
        createExecutionEnvironment.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
        createExecutionEnvironment.enableCheckpointing(50L);
        RestartStrategyUtils.configureNoRestartStrategy(createExecutionEnvironment);
        CollectIteratorBuilder<T> addCollectSink = addCollectSink(createExecutionEnvironment.fromSource(tryCreateSource(dataStreamSourceExternalContext, build), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(i2));
        JobClient executeAsync = createExecutionEnvironment.executeAsync("Restart Test");
        try {
            CollectResultIterator<T> build3 = addCollectSink.build(executeAsync);
            checkResultWithSemantic(build3, arrayList2, checkpointingMode, Integer.valueOf(getTestDataSize(arrayList2)));
            String str = (String) executeAsync.stopWithSavepoint(false, testEnvironment.getCheckpointUri(), SavepointFormatType.CANONICAL).get(30L, TimeUnit.SECONDS);
            CommonTestUtils.waitForJobStatus(executeAsync, Collections.singletonList(JobStatus.FINISHED));
            ArrayList arrayList3 = new ArrayList();
            for (int i5 = 0; i5 < i; i5++) {
                arrayList3.add(generateTestDataForWriter(dataStreamSourceExternalContext, build, i5, (ExternalSystemSplitDataWriter) arrayList.get(i5)));
            }
            StreamExecutionEnvironment createExecutionEnvironment2 = testEnvironment.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSourceExternalContext.getConnectorJarPaths()).setSavepointRestorePath(str).build());
            createExecutionEnvironment2.enableCheckpointing(500L);
            createExecutionEnvironment2.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
            addCollectSink(createExecutionEnvironment2.fromSource(tryCreateSource(dataStreamSourceExternalContext, build), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(i3));
            JobClient executeAsync2 = createExecutionEnvironment2.executeAsync("Restart Test");
            CommonTestUtils.waitForJobStatus(executeAsync2, Collections.singletonList(JobStatus.RUNNING));
            try {
                build3.setJobClient(executeAsync2);
                checkResultWithSemantic(build3, arrayList3, checkpointingMode, Integer.valueOf(getTestDataSize(arrayList3)));
                killJob(executeAsync2);
                build3.close();
            } catch (Throwable th) {
                killJob(executeAsync2);
                build3.close();
                throw th;
            }
        } catch (Exception e) {
            killJob(executeAsync);
            throw e;
        }
    }

    @DisplayName("Test source metrics")
    @TestTemplate
    public void testSourceMetrics(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        TestingSourceSettings build = TestingSourceSettings.builder().setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED).setCheckpointingMode(checkpointingMode).build();
        TestEnvironmentSettings build2 = TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSourceExternalContext.getConnectorJarPaths()).build();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(generateAndWriteTestData(i, dataStreamSourceExternalContext, build));
        }
        String str = "metricTestSource" + arrayList.hashCode();
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(build2);
        createExecutionEnvironment.fromSource(tryCreateSource(dataStreamSourceExternalContext, build), WatermarkStrategy.noWatermarks(), str).setParallelism(4).sinkTo(new DiscardingSink());
        JobClient executeAsync = createExecutionEnvironment.executeAsync("Metrics Test");
        MetricQuerier metricQuerier = new MetricQuerier(new Configuration());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            CommonTestUtils.waitForAllTaskRunning(() -> {
                return MetricQuerier.getJobDetails(new RestClient(new Configuration(), newCachedThreadPool), testEnvironment.getRestEndpoint(), executeAsync.getJobID());
            });
            CommonTestUtils.waitUntilCondition(() -> {
                try {
                    return Boolean.valueOf(checkSourceMetrics(metricQuerier, testEnvironment, executeAsync.getJobID(), str, getTestDataSize(arrayList)));
                } catch (Exception e) {
                    return false;
                }
            });
            newCachedThreadPool.shutdown();
            killJob(executeAsync);
        } catch (Throwable th) {
            newCachedThreadPool.shutdown();
            killJob(executeAsync);
            throw th;
        }
    }

    @DisplayName("Test source with at least one idle parallelism")
    @TestTemplate
    public void testIdleReader(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        TestingSourceSettings build = TestingSourceSettings.builder().setBoundedness(Boundedness.BOUNDED).setCheckpointingMode(checkpointingMode).build();
        TestEnvironmentSettings build2 = TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSourceExternalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> tryCreateSource = tryCreateSource(dataStreamSourceExternalContext, build);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(generateAndWriteTestData(i, dataStreamSourceExternalContext, build));
        }
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(build2);
        CollectIteratorBuilder<T> addCollectSink = addCollectSink(createExecutionEnvironment.fromSource(tryCreateSource, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(4 + 1));
        JobClient submitJob = submitJob(createExecutionEnvironment, "Idle Reader Test");
        CollectResultIterator<T> build3 = addCollectSink.build(submitJob);
        try {
            LOG.info("Checking test results");
            checkResultWithSemantic(build3, arrayList, checkpointingMode, null);
            if (build3 != null) {
                build3.close();
            }
            CommonTestUtils.waitForJobStatus(submitJob, Collections.singletonList(JobStatus.FINISHED));
        } catch (Throwable th) {
            if (build3 != null) {
                try {
                    build3.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DisplayName("Test TaskManager failure")
    @TestTemplate
    public void testTaskManagerFailure(TestEnvironment testEnvironment, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, ClusterControllable clusterControllable, CheckpointingMode checkpointingMode) throws Exception {
        TestingSourceSettings build = TestingSourceSettings.builder().setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED).setCheckpointingMode(checkpointingMode).build();
        TestEnvironmentSettings build2 = TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSourceExternalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> tryCreateSource = tryCreateSource(dataStreamSourceExternalContext, build);
        List<T> generateTestData = dataStreamSourceExternalContext.generateTestData(build, 0, ThreadLocalRandom.current().nextLong());
        ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter = dataStreamSourceExternalContext.createSourceSplitDataWriter(build);
        LOG.info("Writing {} records for split {} to external system", Integer.valueOf(generateTestData.size()), 0);
        createSourceSplitDataWriter.writeRecords(generateTestData);
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(build2);
        createExecutionEnvironment.enableCheckpointing(50L);
        CollectIteratorBuilder<T> addCollectSink = addCollectSink(createExecutionEnvironment.fromSource(tryCreateSource, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(1));
        JobClient submitJob = submitJob(createExecutionEnvironment, "TaskManager Failover Test");
        CollectResultIterator<T> build3 = addCollectSink.build(submitJob);
        LOG.info("Checking records before killing TaskManagers");
        checkResultWithSemantic(build3, Collections.singletonList(generateTestData), checkpointingMode, Integer.valueOf(generateTestData.size()));
        LOG.info("Trigger TaskManager failover");
        clusterControllable.triggerTaskManagerFailover(submitJob, () -> {
        });
        LOG.info("Waiting for job recovering from failure");
        CommonTestUtils.waitForJobStatus(submitJob, Collections.singletonList(JobStatus.RUNNING));
        List<T> generateTestData2 = dataStreamSourceExternalContext.generateTestData(build, 0, ThreadLocalRandom.current().nextLong());
        LOG.info("Writing {} records for split {} to external system", Integer.valueOf(generateTestData2.size()), 0);
        createSourceSplitDataWriter.writeRecords(generateTestData2);
        LOG.info("Checking records after job failover");
        checkResultWithSemantic(build3, Collections.singletonList(generateTestData2), checkpointingMode, Integer.valueOf(generateTestData2.size()));
        CommonTestUtils.terminateJob(submitJob);
        CommonTestUtils.waitForJobStatus(submitJob, Collections.singletonList(JobStatus.CANCELED));
        build3.close();
    }

    protected List<T> generateAndWriteTestData(int i, DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, TestingSourceSettings testingSourceSettings) {
        List<T> generateTestData = dataStreamSourceExternalContext.generateTestData(testingSourceSettings, i, ThreadLocalRandom.current().nextLong());
        LOG.info("Writing {} records for split {} to external system", Integer.valueOf(generateTestData.size()), Integer.valueOf(i));
        dataStreamSourceExternalContext.createSourceSplitDataWriter(testingSourceSettings).writeRecords(generateTestData);
        return generateTestData;
    }

    protected Source<T, ?, ?> tryCreateSource(DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, TestingSourceSettings testingSourceSettings) {
        try {
            return dataStreamSourceExternalContext.createSource(testingSourceSettings);
        } catch (UnsupportedOperationException e) {
            throw new TestAbortedException("Cannot create source satisfying given options", e);
        }
    }

    protected JobClient submitJob(StreamExecutionEnvironment streamExecutionEnvironment, String str) throws Exception {
        LOG.info("Submitting Flink job {} to test environment", str);
        return streamExecutionEnvironment.executeAsync(str);
    }

    protected CollectIteratorBuilder<T> addCollectSink(DataStream<T> dataStream) {
        TypeSerializer createSerializer = dataStream.getType().createSerializer(dataStream.getExecutionConfig().getSerializerConfig());
        String str = "dataStreamCollect_" + UUID.randomUUID();
        CollectStreamSink collectStreamSink = new CollectStreamSink(dataStream, new CollectSinkOperatorFactory(createSerializer, str));
        collectStreamSink.name("Data stream collect sink");
        collectStreamSink.uid("dataStreamCollect");
        dataStream.getExecutionEnvironment().addOperator(collectStreamSink.getTransformation());
        return new CollectIteratorBuilder<>("dataStreamCollect", createSerializer, str, dataStream.getExecutionEnvironment().getCheckpointConfig());
    }

    protected List<T> generateTestDataForWriter(DataStreamSourceExternalContext<T> dataStreamSourceExternalContext, TestingSourceSettings testingSourceSettings, int i, ExternalSystemSplitDataWriter<T> externalSystemSplitDataWriter) {
        List<T> generateTestData = dataStreamSourceExternalContext.generateTestData(testingSourceSettings, i, ThreadLocalRandom.current().nextLong());
        LOG.debug("Writing {} records to external system", Integer.valueOf(generateTestData.size()));
        externalSystemSplitDataWriter.writeRecords(generateTestData);
        return generateTestData;
    }

    protected int getTestDataSize(List<List<T>> list) {
        int i = 0;
        Iterator<List<T>> it = list.iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    protected void checkResultWithSemantic(CloseableIterator<T> closeableIterator, List<List<T>> list, CheckpointingMode checkpointingMode, Integer num) {
        if (num != null) {
            FlinkAssertions.assertThatFuture(CompletableFuture.runAsync(() -> {
                CollectIteratorAssertions.assertThat(closeableIterator).withNumRecordsLimit(num.intValue()).matchesRecordsFromSource(list, checkpointingMode);
            })).eventuallySucceeds();
        } else {
            CollectIteratorAssertions.assertThat(closeableIterator).matchesRecordsFromSource(list, checkpointingMode);
        }
    }

    private boolean checkSourceMetrics(MetricQuerier metricQuerier, TestEnvironment testEnvironment, JobID jobID, String str, long j) throws Exception {
        return Precision.equals(j, metricQuerier.getAggregatedMetricsByRestAPI(testEnvironment.getRestEndpoint(), jobID, str, "numRecordsIn", null).doubleValue());
    }

    private void killJob(JobClient jobClient) throws Exception {
        CommonTestUtils.terminateJob(jobClient);
        CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.CANCELED));
    }
}
