package com.ververica.cdc.connectors.mysql.source;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.TestTable;
import com.ververica.cdc.connectors.mysql.testutils.TestTableSchemas;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import java.io.File;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.class */
public class SpecificStartingOffsetITCase {
    private static final Logger LOG = LoggerFactory.getLogger(SpecificStartingOffsetITCase.class);

    @RegisterExtension
    static MiniClusterExtension miniCluster = new MiniClusterExtension();
    private final MySqlContainer mysql = new MySqlContainer().withConfigurationOverride(buildMySqlConfigWithTimezone(getResourceFolder(), getSystemTimeZone())).withSetupSQL("docker/setup.sql").m21withDatabaseName("flink-test").m23withUsername("flinkuser").m22withPassword("flinkpw").withLogConsumer(new Slf4jLogConsumer(LOG));
    private final UniqueDatabase customDatabase = new UniqueDatabase(this.mysql, "customer", "mysqluser", "mysqlpw");
    private final TestTable customers = new TestTable(this.customDatabase, "customers", TestTableSchemas.CUSTOMERS);
    private MySqlConnection connection;

    @BeforeEach
    void prepare() throws Exception {
        this.mysql.start();
        this.connection = getConnection();
        this.customDatabase.createAndInitialize();
        flushLogs();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.customDatabase.dropDatabase();
        this.connection.close();
        this.mysql.stop();
    }

    @Test
    void testStartingFromEarliestOffset() throws Exception {
        purgeBinaryLogs();
        executeStatements(String.format("INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');", this.customers.getTableId()));
        StreamExecutionEnvironment executionEnvironment = getExecutionEnvironment();
        CollectResultIterator addCollector = addCollector(executionEnvironment, executionEnvironment.fromSource(getSourceBuilder().startupOptions(StartupOptions.earliest()).build(), WatermarkStrategy.noWatermarks(), "earliest-offset-test"));
        StreamExecutionEnvironment executionEnvironment2 = getExecutionEnvironment();
        duplicateTransformations(executionEnvironment, executionEnvironment2);
        JobClient executeAsync = executionEnvironment.executeAsync();
        addCollector.setJobClient(executeAsync);
        TestTable testTable = this.customers;
        Objects.requireNonNull(testTable);
        Assertions.assertThat(fetchRowData(addCollector, 3, testTable::stringify)).containsExactly(new String[]{"+I[15213, Alice, Rome, 123456987]", "+I[15513, Bob, Milan, 123456987]", "+I[18213, Charlie, Paris, 123456987]"});
        String str = (String) executeAsync.stopWithSavepoint(false, Files.createTempDirectory("earliest-offset-test", new FileAttribute[0]).toAbsolutePath().toString(), SavepointFormatType.DEFAULT).get();
        executeStatements(String.format("UPDATE %s SET name = 'Alicia' WHERE id = 15213", this.customers.getTableId()));
        setupSavepoint(executionEnvironment2, str);
        JobClient executeAsync2 = executionEnvironment2.executeAsync();
        addCollector.setJobClient(executeAsync2);
        TestTable testTable2 = this.customers;
        Objects.requireNonNull(testTable2);
        Assertions.assertThat(fetchRowData(addCollector, 2, testTable2::stringify)).containsExactly(new String[]{"-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]"});
        executeAsync2.cancel().get();
    }

    @Test
    void testStartingFromSpecificOffset() throws Exception {
        purgeBinaryLogs();
        BinlogOffset currentBinlogOffset = DebeziumUtils.currentBinlogOffset(this.connection);
        executeStatements(String.format("INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');", this.customers.getTableId()));
        StreamExecutionEnvironment executionEnvironment = getExecutionEnvironment();
        CollectResultIterator addCollector = addCollector(executionEnvironment, executionEnvironment.fromSource(getSourceBuilder().startupOptions(StartupOptions.specificOffset(currentBinlogOffset.getFilename(), currentBinlogOffset.getPosition())).build(), WatermarkStrategy.noWatermarks(), "specific-offset-test"));
        StreamExecutionEnvironment executionEnvironment2 = getExecutionEnvironment();
        duplicateTransformations(executionEnvironment, executionEnvironment2);
        JobClient executeAsync = executionEnvironment.executeAsync();
        addCollector.setJobClient(executeAsync);
        TestTable testTable = this.customers;
        Objects.requireNonNull(testTable);
        Assertions.assertThat(fetchRowData(addCollector, 3, testTable::stringify)).containsExactly(new String[]{"+I[15213, Alice, Rome, 123456987]", "+I[15513, Bob, Milan, 123456987]", "+I[18213, Charlie, Paris, 123456987]"});
        String str = (String) executeAsync.stopWithSavepoint(false, Files.createTempDirectory("specific-offset-test", new FileAttribute[0]).toAbsolutePath().toString(), SavepointFormatType.DEFAULT).get();
        executeStatements(String.format("UPDATE %s SET name = 'Alicia' WHERE id = 15213", this.customers.getTableId()));
        setupSavepoint(executionEnvironment2, str);
        JobClient executeAsync2 = executionEnvironment2.executeAsync("snapshotSplitTest");
        addCollector.setJobClient(executeAsync2);
        TestTable testTable2 = this.customers;
        Objects.requireNonNull(testTable2);
        Assertions.assertThat(fetchRowData(addCollector, 2, testTable2::stringify)).containsExactly(new String[]{"-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]"});
        executeAsync2.cancel().get();
    }

    @Test
    void testStartingFromTimestampOffset() throws Exception {
        purgeBinaryLogs();
        executeStatements(String.format("INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');", this.customers.getTableId()));
        Thread.sleep(1000L);
        StartupOptions timestamp = StartupOptions.timestamp(System.currentTimeMillis());
        executeStatements(String.format("INSERT INTO %s VALUES (19613, 'Tom', 'NewYork', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (20913, 'Cat', 'Washington', '123456987');", this.customers.getTableId()));
        executeStatements(String.format("INSERT INTO %s VALUES (23313, 'Mouse', 'Seattle', '123456987');", this.customers.getTableId()));
        StreamExecutionEnvironment executionEnvironment = getExecutionEnvironment();
        CollectResultIterator addCollector = addCollector(executionEnvironment, executionEnvironment.fromSource(getSourceBuilder().startupOptions(timestamp).build(), WatermarkStrategy.noWatermarks(), "timestamp-offset-test"));
        StreamExecutionEnvironment executionEnvironment2 = getExecutionEnvironment();
        duplicateTransformations(executionEnvironment, executionEnvironment2);
        JobClient executeAsync = executionEnvironment.executeAsync();
        addCollector.setJobClient(executeAsync);
        TestTable testTable = this.customers;
        Objects.requireNonNull(testTable);
        Assertions.assertThat(fetchRowData(addCollector, 3, testTable::stringify)).containsExactly(new String[]{"+I[19613, Tom, NewYork, 123456987]", "+I[20913, Cat, Washington, 123456987]", "+I[23313, Mouse, Seattle, 123456987]"});
        String str = (String) executeAsync.stopWithSavepoint(false, Files.createTempDirectory("timestamp-offset-test", new FileAttribute[0]).toAbsolutePath().toString(), SavepointFormatType.DEFAULT).get();
        executeStatements(String.format("UPDATE %s SET name = 'George' WHERE id = 18213", this.customers.getTableId()));
        setupSavepoint(executionEnvironment2, str);
        JobClient executeAsync2 = executionEnvironment2.executeAsync("snapshotSplitTest");
        addCollector.setJobClient(executeAsync2);
        TestTable testTable2 = this.customers;
        Objects.requireNonNull(testTable2);
        Assertions.assertThat(fetchRowData(addCollector, 2, testTable2::stringify)).containsExactly(new String[]{"-U[18213, Charlie, Paris, 123456987]", "+U[18213, George, Paris, 123456987]"});
        executeAsync2.cancel().get();
    }

    private MySqlSourceBuilder<RowData> getSourceBuilder() {
        return MySqlSource.builder().hostname(this.mysql.getHost()).port(this.mysql.getDatabasePort()).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).tableList(new String[]{this.customers.getTableId()}).deserializer(this.customers.getDeserializer());
    }

    private MySqlConnection getConnection() {
        HashMap hashMap = new HashMap();
        hashMap.put("database.hostname", this.mysql.getHost());
        hashMap.put("database.port", String.valueOf(this.mysql.getDatabasePort()));
        hashMap.put("database.user", this.customDatabase.getUsername());
        hashMap.put("database.password", this.customDatabase.getPassword());
        return DebeziumUtils.createMySqlConnection(Configuration.from(hashMap), new Properties());
    }

    private void executeStatements(String... strArr) throws Exception {
        this.connection.execute(strArr);
        this.connection.commit();
    }

    private void flushLogs() throws Exception {
        executeStatements("FLUSH LOGS;");
    }

    private void purgeBinaryLogs() throws Exception {
        executeStatements(String.format("PURGE BINARY LOGS TO '%s'", DebeziumUtils.currentBinlogOffset(this.connection).getFilename()));
    }

    private <T> CollectResultIterator<T> addCollector(StreamExecutionEnvironment streamExecutionEnvironment, DataStream<T> dataStream) {
        TypeSerializer createSerializer = dataStream.getTransformation().getOutputType().createSerializer(streamExecutionEnvironment.getConfig());
        String str = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory collectSinkOperatorFactory = new CollectSinkOperatorFactory(createSerializer, str);
        CollectResultIterator<T> collectResultIterator = new CollectResultIterator<>(collectSinkOperatorFactory.getOperator().getOperatorIdFuture(), createSerializer, str, streamExecutionEnvironment.getCheckpointConfig());
        CollectStreamSink collectStreamSink = new CollectStreamSink(dataStream, collectSinkOperatorFactory);
        collectStreamSink.name("Data stream collect sink");
        streamExecutionEnvironment.addOperator(collectStreamSink.getTransformation());
        return collectResultIterator;
    }

    private List<String> fetchRowData(Iterator<RowData> it, int i, Function<RowData, String> function) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next());
            i--;
        }
        return (List) arrayList.stream().map(function).collect(Collectors.toList());
    }

    private static String buildMySqlConfigWithTimezone(File file, String str) {
        try {
            TemporaryFolder temporaryFolder = new TemporaryFolder(file);
            temporaryFolder.create();
            Path createFile = Files.createFile(Paths.get(temporaryFolder.newFolder(String.valueOf(UUID.randomUUID())).getPath(), "my.cnf"), new FileAttribute[0]);
            Files.write(createFile, Collections.singleton("[mysqld]\nbinlog_format = row\nlog_bin = mysql-bin\nserver-id = 223344\nbinlog_row_image = FULL\ngtid_mode = on\nenforce_gtid_consistency = on\n" + ("default-time_zone = '" + str + "'\n")), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
            return Paths.get(file.getAbsolutePath(), new String[0]).relativize(createFile).toString();
        } catch (Exception e) {
            throw new RuntimeException("Failed to create my.cnf file.", e);
        }
    }

    private static File getResourceFolder() {
        try {
            return Paths.get(((URL) Objects.requireNonNull(SpecificStartingOffsetITCase.class.getClassLoader().getResource("."))).toURI()).toFile();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Get Resource File Directory fail");
        }
    }

    private static String getSystemTimeZone() {
        return ZoneId.systemDefault().toString();
    }

    private void setupSavepoint(StreamExecutionEnvironment streamExecutionEnvironment, String str) throws Exception {
        Field declaredField = Thread.currentThread().getContextClassLoader().loadClass("org.apache.flink.streaming.api.environment.StreamExecutionEnvironment").getDeclaredField("configuration");
        declaredField.setAccessible(true);
        ((org.apache.flink.configuration.Configuration) declaredField.get(streamExecutionEnvironment)).setString(SavepointConfigOptions.SAVEPOINT_PATH, str);
    }

    private void duplicateTransformations(StreamExecutionEnvironment streamExecutionEnvironment, StreamExecutionEnvironment streamExecutionEnvironment2) {
        List transformations = streamExecutionEnvironment.getTransformations();
        Objects.requireNonNull(streamExecutionEnvironment2);
        transformations.forEach(streamExecutionEnvironment2::addOperator);
    }

    private StreamExecutionEnvironment getExecutionEnvironment() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        return executionEnvironment;
    }
}
