package org.apache.flink.connector.testframe.testsuites;

import java.lang.Comparable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.math3.util.Precision;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV1ExternalContext;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
import org.apache.flink.connector.testframe.source.FromElementsSource;
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
import org.apache.flink.connector.testframe.utils.MetricQuerier;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@ExtendWith({ConnectorTestingExtension.class, TestLoggerExtension.class, TestCaseInvocationContextProvider.class})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.class */
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);

    @DisplayName("Test data stream sink")
    @TestTemplate
    public void testBasicSink(TestEnvironment testEnvironment, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        TestingSinkSettings testingSinkSettings = getTestingSinkSettings(checkpointingMode);
        List<T> generateTestData = generateTestData(testingSinkSettings, dataStreamSinkExternalContext);
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSinkExternalContext.getConnectorJarPaths()).build());
        createExecutionEnvironment.enableCheckpointing(50L);
        tryCreateSink(createExecutionEnvironment.fromCollection(generateTestData).name("sourceInSinkTest").setParallelism(1).returns(dataStreamSinkExternalContext.getProducedType()), dataStreamSinkExternalContext, testingSinkSettings).setParallelism(1).name("sinkInSinkTest");
        CommonTestUtils.waitForJobStatus(createExecutionEnvironment.executeAsync("DataStream Sink Test"), Collections.singletonList(JobStatus.FINISHED));
        checkResultWithSemantic(dataStreamSinkExternalContext.createSinkDataReader(testingSinkSettings), generateTestData, checkpointingMode);
    }

    @DisplayName("Test sink restarting from a savepoint")
    @TestTemplate
    public void testStartFromSavepoint(TestEnvironment testEnvironment, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        restartFromSavepoint(testEnvironment, dataStreamSinkExternalContext, checkpointingMode, 2, 2);
    }

    @DisplayName("Test sink restarting with a higher parallelism")
    @TestTemplate
    public void testScaleUp(TestEnvironment testEnvironment, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        restartFromSavepoint(testEnvironment, dataStreamSinkExternalContext, checkpointingMode, 2, 4);
    }

    @DisplayName("Test sink restarting with a lower parallelism")
    @TestTemplate
    public void testScaleDown(TestEnvironment testEnvironment, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        restartFromSavepoint(testEnvironment, dataStreamSinkExternalContext, checkpointingMode, 4, 2);
    }

    private void restartFromSavepoint(TestEnvironment testEnvironment, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, CheckpointingMode checkpointingMode, int i, int i2) throws Exception {
        TestingSinkSettings testingSinkSettings = getTestingSinkSettings(checkpointingMode);
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSinkExternalContext.getConnectorJarPaths()).build());
        createExecutionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        List<T> generateTestData = generateTestData(testingSinkSettings, dataStreamSinkExternalContext);
        int size = generateTestData.size() / 2;
        DataStreamSource parallelism = createExecutionEnvironment.fromSource(new FromElementsSource(Boundedness.CONTINUOUS_UNBOUNDED, generateTestData, Integer.valueOf(size)), WatermarkStrategy.noWatermarks(), "beforeRestartSource").setParallelism(1);
        tryCreateSink(parallelism.returns(dataStreamSinkExternalContext.getProducedType()), dataStreamSinkExternalContext, testingSinkSettings).name("Sink restart test").setParallelism(i);
        CollectResultIterator<T> addCollectSink = addCollectSink(parallelism);
        JobClient executeAsync = createExecutionEnvironment.executeAsync("Restart Test");
        addCollectSink.setJobClient(executeAsync);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            CommonTestUtils.waitForAllTaskRunning(() -> {
                return MetricQuerier.getJobDetails(new RestClient(new Configuration(), newCachedThreadPool), testEnvironment.getRestEndpoint(), executeAsync.getJobID());
            });
            waitExpectedSizeData(addCollectSink, size);
            String str = (String) executeAsync.stopWithSavepoint(true, testEnvironment.getCheckpointUri(), SavepointFormatType.CANONICAL).get(30L, TimeUnit.SECONDS);
            CommonTestUtils.waitForJobStatus(executeAsync, Collections.singletonList(JobStatus.FINISHED));
            checkResultWithSemantic(dataStreamSinkExternalContext.createSinkDataReader(testingSinkSettings), generateTestData.subList(0, size), checkpointingMode);
            StreamExecutionEnvironment createExecutionEnvironment2 = testEnvironment.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSinkExternalContext.getConnectorJarPaths()).setSavepointRestorePath(str).build());
            createExecutionEnvironment2.enableCheckpointing(50L);
            DataStreamSource parallelism2 = createExecutionEnvironment2.fromSource(new FromElementsSource(Boundedness.CONTINUOUS_UNBOUNDED, generateTestData, Integer.valueOf(generateTestData.size())), WatermarkStrategy.noWatermarks(), "restartSource").setParallelism(1);
            tryCreateSink(parallelism2.returns(dataStreamSinkExternalContext.getProducedType()), dataStreamSinkExternalContext, testingSinkSettings).setParallelism(i2);
            addCollectSink(parallelism2);
            JobClient executeAsync2 = createExecutionEnvironment2.executeAsync("Restart Test");
            try {
                checkResultWithSemantic(dataStreamSinkExternalContext.createSinkDataReader(testingSinkSettings), generateTestData, checkpointingMode);
                newCachedThreadPool.shutdown();
                killJob(executeAsync2);
                addCollectSink.close();
            } catch (Throwable th) {
                newCachedThreadPool.shutdown();
                killJob(executeAsync2);
                addCollectSink.close();
                throw th;
            }
        } catch (Exception e) {
            newCachedThreadPool.shutdown();
            killJob(executeAsync);
            throw e;
        }
    }

    @DisplayName("Test sink metrics")
    @TestTemplate
    public void testMetrics(TestEnvironment testEnvironment, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, CheckpointingMode checkpointingMode) throws Exception {
        TestingSinkSettings testingSinkSettings = getTestingSinkSettings(checkpointingMode);
        List<T> generateTestData = generateTestData(testingSinkSettings, dataStreamSinkExternalContext);
        String str = "metricTestSink" + generateTestData.hashCode();
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(dataStreamSinkExternalContext.getConnectorJarPaths()).build());
        createExecutionEnvironment.enableCheckpointing(50L);
        tryCreateSink(createExecutionEnvironment.fromSource(new FromElementsSource(Boundedness.CONTINUOUS_UNBOUNDED, generateTestData, Integer.valueOf(generateTestData.size())), WatermarkStrategy.noWatermarks(), "metricTestSource").setParallelism(1).returns(dataStreamSinkExternalContext.getProducedType()), dataStreamSinkExternalContext, testingSinkSettings).name(str).setParallelism(1);
        JobClient executeAsync = createExecutionEnvironment.executeAsync("Metrics Test");
        MetricQuerier metricQuerier = new MetricQuerier(new Configuration());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            CommonTestUtils.waitForAllTaskRunning(() -> {
                return MetricQuerier.getJobDetails(new RestClient(new Configuration(), newCachedThreadPool), testEnvironment.getRestEndpoint(), executeAsync.getJobID());
            });
            CommonTestUtils.waitUntilCondition(() -> {
                try {
                    return Boolean.valueOf(compareSinkMetrics(metricQuerier, testEnvironment, dataStreamSinkExternalContext, executeAsync.getJobID(), str, "numRecordsSend", generateTestData.size()));
                } catch (Exception e) {
                    return false;
                }
            });
            newCachedThreadPool.shutdown();
            killJob(executeAsync);
        } catch (Throwable th) {
            newCachedThreadPool.shutdown();
            killJob(executeAsync);
            throw th;
        }
    }

    protected List<T> generateTestData(TestingSinkSettings testingSinkSettings, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext) {
        return dataStreamSinkExternalContext.generateTestData(testingSinkSettings, ThreadLocalRandom.current().nextLong());
    }

    private List<T> pollAndAppendResultData(List<T> list, ExternalSystemDataReader<T> externalSystemDataReader, List<T> list2, int i, CheckpointingMode checkpointingMode) {
        int i2 = 0;
        while (true) {
            int i3 = i2;
            i2++;
            if (i3 >= i || checkGetEnoughRecordsWithSemantic(list2, list, checkpointingMode)) {
                break;
            }
            list.addAll(externalSystemDataReader.poll(Duration.ofMillis(1000L)));
        }
        return list;
    }

    private boolean checkGetEnoughRecordsWithSemantic(List<T> list, List<T> list2, CheckpointingMode checkpointingMode) {
        Preconditions.checkNotNull(list);
        Preconditions.checkNotNull(list2);
        if (CheckpointingMode.EXACTLY_ONCE.equals(checkpointingMode)) {
            return list.size() <= list2.size();
        }
        if (!CheckpointingMode.AT_LEAST_ONCE.equals(checkpointingMode)) {
            throw new IllegalStateException(String.format("%s delivery guarantee doesn't support test.", checkpointingMode.name()));
        }
        HashSet hashSet = new HashSet();
        for (T t : list) {
            int size = hashSet.size();
            int i = 0;
            while (true) {
                if (i >= list2.size()) {
                    break;
                }
                if (!hashSet.contains(Integer.valueOf(i)) && t.equals(list2.get(i))) {
                    hashSet.add(Integer.valueOf(i));
                    break;
                }
                i++;
            }
            if (size == hashSet.size()) {
                return false;
            }
        }
        return true;
    }

    protected void checkResultWithSemantic(ExternalSystemDataReader<T> externalSystemDataReader, List<T> list, CheckpointingMode checkpointingMode) throws Exception {
        ArrayList arrayList = new ArrayList();
        CommonTestUtils.waitUntilCondition(() -> {
            pollAndAppendResultData(arrayList, externalSystemDataReader, list, 30, checkpointingMode);
            try {
                CollectIteratorAssertions.assertThat(sort(arrayList).iterator()).matchesRecordsFromSource(Arrays.asList(sort(list)), checkpointingMode);
                return true;
            } catch (Throwable th) {
                return false;
            }
        });
    }

    private boolean compareSinkMetrics(MetricQuerier metricQuerier, TestEnvironment testEnvironment, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, JobID jobID, String str, String str2, long j) throws Exception {
        double doubleValue = metricQuerier.getAggregatedMetricsByRestAPI(testEnvironment.getRestEndpoint(), jobID, str, str2, getSinkMetricFilter(dataStreamSinkExternalContext)).doubleValue();
        if (Precision.equals(j, doubleValue)) {
            return true;
        }
        LOG.info("expected:<{}> but was <{}>({})", new Object[]{Long.valueOf(j), Double.valueOf(doubleValue), str2});
        return false;
    }

    private List<T> sort(List<T> list) {
        return (List) list.stream().sorted().collect(Collectors.toList());
    }

    private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) {
        return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build();
    }

    private void killJob(JobClient jobClient) throws Exception {
        CommonTestUtils.terminateJob(jobClient);
        CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.CANCELED));
    }

    private DataStreamSink<T> tryCreateSink(DataStream<T> dataStream, DataStreamSinkExternalContext<T> dataStreamSinkExternalContext, TestingSinkSettings testingSinkSettings) {
        try {
            if (dataStreamSinkExternalContext instanceof DataStreamSinkV1ExternalContext) {
                return dataStream.sinkTo(((DataStreamSinkV1ExternalContext) dataStreamSinkExternalContext).createSink(testingSinkSettings));
            }
            if (dataStreamSinkExternalContext instanceof DataStreamSinkV2ExternalContext) {
                return dataStream.sinkTo(((DataStreamSinkV2ExternalContext) dataStreamSinkExternalContext).createSink(testingSinkSettings));
            }
            throw new IllegalArgumentException(String.format("The supported context are DataStreamSinkV1ExternalContext and DataStreamSinkV2ExternalContext, but actual is %s.", dataStreamSinkExternalContext.getClass()));
        } catch (UnsupportedOperationException e) {
            throw new TestAbortedException("Cannot create a sink satisfying given options.", e);
        }
    }

    private String getSinkMetricFilter(DataStreamSinkExternalContext<T> dataStreamSinkExternalContext) {
        if (dataStreamSinkExternalContext instanceof DataStreamSinkV1ExternalContext) {
            return null;
        }
        if (dataStreamSinkExternalContext instanceof DataStreamSinkV2ExternalContext) {
            return "Writer";
        }
        throw new IllegalArgumentException(String.format("Get unexpected sink context: %s", dataStreamSinkExternalContext.getClass()));
    }

    protected CollectResultIterator<T> addCollectSink(DataStream<T> dataStream) {
        TypeSerializer createSerializer = dataStream.getType().createSerializer(dataStream.getExecutionConfig());
        String str = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory collectSinkOperatorFactory = new CollectSinkOperatorFactory(createSerializer, str);
        CollectSinkOperator operator = collectSinkOperatorFactory.getOperator();
        CollectStreamSink collectStreamSink = new CollectStreamSink(dataStream, collectSinkOperatorFactory);
        collectStreamSink.name("Data stream collect sink");
        dataStream.getExecutionEnvironment().addOperator(collectStreamSink.getTransformation());
        return new CollectResultIterator<>(operator.getOperatorIdFuture(), createSerializer, str, dataStream.getExecutionEnvironment().getCheckpointConfig());
    }

    private void waitExpectedSizeData(CollectResultIterator<T> collectResultIterator, int i) {
        FlinkAssertions.assertThatFuture(CompletableFuture.supplyAsync(() -> {
            int i2 = 0;
            while (i2 < i && collectResultIterator.hasNext()) {
                collectResultIterator.next();
                i2++;
            }
            if (i2 < i) {
                throw new IllegalStateException(String.format("Fail to get %d records.", Integer.valueOf(i)));
            }
            return true;
        })).eventuallySucceeds();
    }
}
