package com.ververica.cdc.connectors.mysql.debezium.reader;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.MySqlTestUtils;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.class */
public class BinlogSplitReaderTest extends MySqlSourceTestBase {
    private static final String TEST_USER = "mysqluser";
    private static final String TEST_PASSWORD = "mysqlpw";
    private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase inventoryDatabase8 = new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
    private BinaryLogClient binaryLogClient;
    private MySqlConnection mySqlConnection;
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
    private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest$TestStatefulTaskContext.class */
    public static class TestStatefulTaskContext extends StatefulTaskContext {
        public TestStatefulTaskContext(MySqlSourceConfig mySqlSourceConfig, BinaryLogClient binaryLogClient, MySqlConnection mySqlConnection) {
            super(mySqlSourceConfig, binaryLogClient, mySqlConnection);
        }

        protected MySqlOffsetContext loadStartingOffsetState(OffsetContext.Loader<MySqlOffsetContext> loader, MySqlSplit mySqlSplit) {
            BinlogOffset ofEarliest = mySqlSplit.isSnapshotSplit() ? BinlogOffset.ofEarliest() : BinlogOffsetUtils.initializeEffectiveOffset(mySqlSplit.asBinlogSplit().getStartingOffset(), getConnection());
            BinlogSplitReaderTest.LOG.info("Starting offset is initialized to {}", ofEarliest);
            return loader.load(ofEarliest.getOffset());
        }
    }

    @BeforeClass
    public static void beforeClass() {
        LOG.info("Starting MySql8 containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Container MySql8 is started.");
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MySql8 containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Container MySql8 is stopped.");
    }

    @After
    public void after() throws Exception {
        if (this.mySqlConnection != null) {
            this.mySqlConnection.close();
        }
        if (this.binaryLogClient != null) {
            this.binaryLogClient.disconnect();
        }
        this.customerDatabase.dropDatabase();
    }

    @Test
    public void testReadSingleBinlogSplit() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customers_even_dist"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        List<MySqlSnapshotSplit> mySqlSplits = getMySqlSplits(new String[]{"customers_even_dist"}, config);
        String[] strArr = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "+I[104, user_4, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]"};
        assertEqualsInAnyOrder(Arrays.asList(strArr), readBinlogSplitsFromSnapshotSplits(mySqlSplits, ROW, config, 1, strArr.length, mySqlSplits.get(mySqlSplits.size() - 1).getTableId()));
    }

    @Test
    public void testReadAllBinlogSplitsForOneTable() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customers_even_dist"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        List<MySqlSnapshotSplit> mySqlSplits = getMySqlSplits(new String[]{"customers_even_dist"}, config);
        String[] strArr = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[104, user_4, Shanghai, 123567891234]", "+I[105, user_5, Shanghai, 123567891234]", "+I[106, user_6, Shanghai, 123567891234]", "+I[107, user_7, Shanghai, 123567891234]", "+I[108, user_8, Shanghai, 123567891234]", "+I[109, user_9, Shanghai, 123567891234]", "+I[110, user_10, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        assertEqualsInAnyOrder(Arrays.asList(strArr), readBinlogSplitsFromSnapshotSplits(mySqlSplits, ROW, config, mySqlSplits.size(), strArr.length, mySqlSplits.get(mySqlSplits.size() - 1).getTableId()));
    }

    @Test
    public void testReadAllBinlogForTableWithSingleLine() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customer_card_single_line"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("card_no", DataTypes.BIGINT()), DataTypes.FIELD("level", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("note", DataTypes.STRING())});
        List<MySqlSnapshotSplit> mySqlSplits = getMySqlSplits(new String[]{"customer_card_single_line"}, config);
        String[] strArr = {"+I[20000, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_2, user_2, user with level 2]", "+I[20002, LEVEL_3, user_3, user with level 3]"};
        assertEqualsInAnyOrder(Arrays.asList(strArr), readBinlogSplitsFromSnapshotSplits(mySqlSplits, ROW, config, mySqlSplits.size(), strArr.length, mySqlSplits.get(mySqlSplits.size() - 1).getTableId()));
    }

    @Test
    public void testReadAllBinlogSplitsForTables() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customer_card", "customer_card_single_line"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("card_no", DataTypes.BIGINT()), DataTypes.FIELD("level", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("note", DataTypes.STRING())});
        List<MySqlSnapshotSplit> mySqlSplits = getMySqlSplits(new String[]{"customer_card", "customer_card_single_line"}, config);
        String[] strArr = {"+I[20000, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_2, user_2, user with level 2]", "+I[20002, LEVEL_3, user_3, user with level 3]", "+I[20001, LEVEL_4, user_1, user with level 4]", "+I[20002, LEVEL_4, user_2, user with level 4]", "+I[20003, LEVEL_4, user_3, user with level 4]", "+I[20004, LEVEL_1, user_4, user with level 4]", "+I[20004, LEVEL_2, user_4, user with level 4]", "+I[20004, LEVEL_3, user_4, user with level 4]", "+I[20004, LEVEL_4, user_4, user with level 4]", "+I[30006, LEVEL_3, user_5, user with level 3]", "+I[30007, LEVEL_3, user_6, user with level 3]", "+I[30008, LEVEL_3, user_7, user with level 3]", "+I[30009, LEVEL_1, user_8, user with level 3]", "+I[30009, LEVEL_2, user_8, user with level 3]", "+I[30009, LEVEL_3, user_8, user with level 3]", "+I[40001, LEVEL_2, user_9, user with level 2]", "+I[40002, LEVEL_2, user_10, user with level 2]", "+I[40003, LEVEL_2, user_11, user with level 2]", "+I[50001, LEVEL_1, user_12, user with level 1]", "+I[50002, LEVEL_1, user_13, user with level 1]", "+I[50003, LEVEL_1, user_14, user with level 1]"};
        assertEqualsInAnyOrder(Arrays.asList(strArr), readBinlogSplitsFromSnapshotSplits(mySqlSplits, ROW, config, mySqlSplits.size(), strArr.length, TableId.parse(this.customerDatabase.getDatabaseName() + ".customer_card_single_line")));
    }

    @Test
    public void testReadBinlogFromLatestOffset() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(StartupOptions.latest(), new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config);
        BinlogSplitReader createBinlogReader = createBinlogReader(config);
        createBinlogReader.submitSplit(createBinlogSplit);
        makeCustomersBinlogEvents(this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] strArr = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        assertEqualsInOrder(Arrays.asList(strArr), readBinlogSplits(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())}), createBinlogReader, strArr.length));
        createBinlogReader.close();
    }

    @Test
    public void testReadBinlogFromEarliestOffset() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(StartupOptions.earliest(), new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config);
        BinlogSplitReader createBinlogReader = createBinlogReader(config);
        createBinlogReader.submitSplit(createBinlogSplit);
        makeCustomersBinlogEvents(this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] strArr = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> readBinlogSplits = readBinlogSplits(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())}), createBinlogReader, strArr.length);
        createBinlogReader.close();
        assertEqualsInOrder(Arrays.asList(strArr), readBinlogSplits);
    }

    @Test
    public void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(StartupOptions.earliest(), new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        String qualifiedTableName = this.customerDatabase.qualifiedTableName("customers");
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        addColumnToTable(this.mySqlConnection, qualifiedTableName);
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config);
        BinlogSplitReader createBinlogReader = createBinlogReader(config);
        createBinlogReader.submitSplit(createBinlogSplit);
        Optional findThrowable = ExceptionUtils.findThrowable(Assert.assertThrows(Throwable.class, () -> {
            readBinlogSplits(ROW, createBinlogReader, 1);
        }), SchemaOutOfSyncException.class);
        createBinlogReader.close();
        Assert.assertTrue(findThrowable.isPresent());
        Assert.assertEquals("Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET", ((SchemaOutOfSyncException) findThrowable.get()).getMessage());
    }

    @Test
    public void testReadBinlogFromBinlogFilePosition() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        BinlogOffset currentBinlogOffset = DebeziumUtils.currentBinlogOffset(this.mySqlConnection);
        MySqlSourceConfig config2 = getConfig(StartupOptions.specificOffset(currentBinlogOffset.getFilename(), currentBinlogOffset.getPosition()), new String[]{"customers"});
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config2);
        BinlogSplitReader createBinlogReader = createBinlogReader(config2);
        createBinlogReader.submitSplit(createBinlogSplit);
        makeCustomersBinlogEvents(this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] strArr = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> readBinlogSplits = readBinlogSplits(ROW, createBinlogReader, strArr.length);
        createBinlogReader.close();
        assertEqualsInOrder(Arrays.asList(strArr), readBinlogSplits);
    }

    @Test
    public void testSkippingEvents() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        BinlogOffset currentBinlogOffset = DebeziumUtils.currentBinlogOffset(this.mySqlConnection);
        MySqlSourceConfig config2 = getConfig(StartupOptions.specificOffset(BinlogOffset.builder().setBinlogFilePosition(currentBinlogOffset.getFilename(), currentBinlogOffset.getPosition()).setSkipEvents(3L).setSkipRows(1L).build()), new String[]{"customers"});
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config2);
        BinlogSplitReader createBinlogReader = createBinlogReader(config2);
        createBinlogReader.submitSplit(createBinlogSplit);
        updateCustomersTableInBulk(this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"));
        String[] strArr = {"-U[109, user_4, Shanghai, 123567891234]", "+U[109, user_4, Pittsburgh, 123567891234]"};
        List<String> readBinlogSplits = readBinlogSplits(ROW, createBinlogReader, strArr.length);
        createBinlogReader.close();
        assertEqualsInOrder(Arrays.asList(strArr), readBinlogSplits);
    }

    @Test
    public void testReadBinlogFromGtidSet() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        MySqlSourceConfig config2 = getConfig(StartupOptions.specificOffset(DebeziumUtils.currentBinlogOffset(this.mySqlConnection).getGtidSet()), new String[]{"customers"});
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config2);
        BinlogSplitReader createBinlogReader = createBinlogReader(config2);
        createBinlogReader.submitSplit(createBinlogSplit);
        makeCustomersBinlogEvents(this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] strArr = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> readBinlogSplits = readBinlogSplits(ROW, createBinlogReader, strArr.length);
        createBinlogReader.close();
        assertEqualsInOrder(Arrays.asList(strArr), readBinlogSplits);
    }

    @Test
    public void testReadBinlogFromTimestamp() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        Thread.sleep(2000L);
        MySqlSourceConfig config2 = getConfig(StartupOptions.timestamp(System.currentTimeMillis()), new String[]{"customers"});
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config2);
        BinlogSplitReader createBinlogReader = createBinlogReader(config2);
        createBinlogReader.submitSplit(createBinlogSplit);
        makeCustomersBinlogEvents(this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] strArr = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> readBinlogSplits = readBinlogSplits(ROW, createBinlogReader, strArr.length);
        createBinlogReader.close();
        assertEqualsInOrder(Arrays.asList(strArr), readBinlogSplits);
    }

    @Test
    public void testReadBinlogFromTimestampAfterSchemaChange() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig config = getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING()), DataTypes.FIELD("new_int_column", DataTypes.INT())});
        String qualifiedTableName = this.customerDatabase.qualifiedTableName("customers");
        addColumnToTable(this.mySqlConnection, qualifiedTableName);
        Thread.sleep(2000L);
        MySqlSourceConfig config2 = getConfig(StartupOptions.timestamp(System.currentTimeMillis()), new String[]{"customers"});
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config2);
        BinlogSplitReader createBinlogReader = createBinlogReader(config2);
        createBinlogReader.submitSplit(createBinlogSplit);
        this.mySqlConnection.execute(new String[]{"UPDATE " + qualifiedTableName + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + qualifiedTableName + " where id = 102", "INSERT INTO " + qualifiedTableName + " VALUES(102, 'user_2','Shanghai','123567891234', 15213)", "UPDATE " + qualifiedTableName + " SET address = 'Shanghai' where id = 103"});
        String[] strArr = {"-U[103, user_3, Shanghai, 123567891234, 15213]", "+U[103, user_3, Hangzhou, 123567891234, 15213]", "-D[102, user_2, Shanghai, 123567891234, 15213]", "+I[102, user_2, Shanghai, 123567891234, 15213]", "-U[103, user_3, Hangzhou, 123567891234, 15213]", "+U[103, user_3, Shanghai, 123567891234, 15213]"};
        List<String> readBinlogSplits = readBinlogSplits(ROW, createBinlogReader, strArr.length);
        createBinlogReader.close();
        assertEqualsInOrder(Arrays.asList(strArr), readBinlogSplits);
    }

    @Test
    public void testHeartbeatEvent() throws Exception {
        this.customerDatabase.createAndInitialize();
        Duration ofMillis = Duration.ofMillis(500L);
        Properties properties = new Properties();
        properties.setProperty(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(), String.valueOf(ofMillis.toMillis()));
        MySqlSourceConfig createConfig = getConfigFactory(MYSQL_CONTAINER, this.customerDatabase, new String[]{"customers"}).startupOptions(StartupOptions.latest()).heartbeatInterval(ofMillis).debeziumProperties(properties).createConfig(0);
        this.binaryLogClient = DebeziumUtils.createBinaryClient(createConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(createConfig);
        BinlogSplitReader createBinlogReader = createBinlogReader(createConfig);
        MySqlBinlogSplit createBinlogSplit = createBinlogSplit(createConfig);
        createBinlogReader.submitSplit(createBinlogSplit);
        makeCustomersBinlogEvents(this.mySqlConnection, ((TableId) createBinlogSplit.getTableSchemas().keySet().iterator().next()).toString(), false);
        ArrayList arrayList = new ArrayList();
        CommonTestUtils.waitUtil(() -> {
            arrayList.addAll(pollRecordsFromReader(createBinlogReader, RecordUtils::isHeartbeatEvent));
            return Boolean.valueOf(!arrayList.isEmpty());
        }, DEFAULT_TIMEOUT, "Timeout waiting for heartbeat event");
        createBinlogReader.close();
    }

    @Test
    public void testReadBinlogFromUnavailableBinlog() throws Exception {
        this.inventoryDatabase8.createAndInitialize();
        MySqlSourceConfig config = getConfig(MYSQL8_CONTAINER, this.inventoryDatabase8, new String[]{"products"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient(config.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection(config);
        MySqlSourceConfig config2 = getConfig(MYSQL8_CONTAINER, this.inventoryDatabase8, StartupOptions.specificOffset(DebeziumUtils.currentBinlogOffset(this.mySqlConnection).getGtidSet()), new String[]{"products"});
        Connection jdbcConnection = this.inventoryDatabase8.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                createStatement.execute("DELETE FROM products WHERE id=111;");
                createStatement.execute("FLUSH LOGS;");
                Thread.sleep(3000L);
                createStatement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("FLUSH LOGS;");
                Thread.sleep(3000L);
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                MySqlBinlogSplit createBinlogSplit = createBinlogSplit(config2);
                BinlogSplitReader createBinlogReader = createBinlogReader(config2, true);
                try {
                    try {
                        createBinlogReader.submitSplit(createBinlogSplit);
                        createBinlogReader.pollSplitRecords();
                        createBinlogReader.close();
                    } catch (Throwable th) {
                        createBinlogReader.close();
                        throw th;
                    }
                } catch (Throwable th2) {
                    MySqlTestUtils.assertContainsErrorMsg(th2, "The required binary logs are no longer available on the server. This may happen in following situations:\n1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side.");
                    createBinlogReader.close();
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private BinlogSplitReader createBinlogReader(MySqlSourceConfig mySqlSourceConfig) {
        return createBinlogReader(mySqlSourceConfig, false);
    }

    private BinlogSplitReader createBinlogReader(MySqlSourceConfig mySqlSourceConfig, boolean z) {
        return new BinlogSplitReader(z ? new TestStatefulTaskContext(mySqlSourceConfig, this.binaryLogClient, this.mySqlConnection) : new StatefulTaskContext(mySqlSourceConfig, this.binaryLogClient, this.mySqlConnection), 0);
    }

    private MySqlBinlogSplit createBinlogSplit(MySqlSourceConfig mySqlSourceConfig) throws Exception {
        MySqlBinlogSplitAssigner mySqlBinlogSplitAssigner = new MySqlBinlogSplitAssigner(mySqlSourceConfig);
        mySqlBinlogSplitAssigner.open();
        MySqlConnection createMySqlConnection = DebeziumUtils.createMySqlConnection(mySqlSourceConfig);
        try {
            MySqlBinlogSplit fillTableSchemas = MySqlBinlogSplit.fillTableSchemas(((MySqlSplit) mySqlBinlogSplitAssigner.getNext().get()).asBinlogSplit(), TableDiscoveryUtils.discoverSchemaForCapturedTables(new MySqlPartition(mySqlSourceConfig.getMySqlConnectorConfig().getLogicalName()), mySqlSourceConfig, createMySqlConnection));
            if (createMySqlConnection != null) {
                createMySqlConnection.close();
            }
            return fillTableSchemas;
        } catch (Throwable th) {
            if (createMySqlConnection != null) {
                try {
                    createMySqlConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<SourceRecord> pollRecordsFromReader(BinlogSplitReader binlogSplitReader, Predicate<SourceRecord> predicate) {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator pollSplitRecords = binlogSplitReader.pollSplitRecords();
            if (pollSplitRecords == null) {
                return arrayList;
            }
            while (pollSplitRecords.hasNext()) {
                Iterator it = ((SourceRecords) pollSplitRecords.next()).iterator();
                while (it.hasNext()) {
                    SourceRecord sourceRecord = (SourceRecord) it.next();
                    if (predicate.test(sourceRecord)) {
                        arrayList.add(sourceRecord);
                    }
                }
            }
            LOG.debug("Records polled: {}", arrayList);
            return arrayList;
        } catch (InterruptedException e) {
            throw new RuntimeException("Polling action was interrupted", e);
        }
    }

    private List<String> readBinlogSplits(DataType dataType, BinlogSplitReader binlogSplitReader, int i) {
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < i) {
            arrayList.addAll(formatResult(pollRecordsFromReader(binlogSplitReader, RecordUtils::isDataChangeRecord), dataType));
        }
        return arrayList;
    }

    private List<String> readBinlogSplitsFromSnapshotSplits(List<MySqlSnapshotSplit> list, DataType dataType, MySqlSourceConfig mySqlSourceConfig, int i, int i2, TableId tableId) throws Exception {
        StatefulTaskContext statefulTaskContext = new StatefulTaskContext(mySqlSourceConfig, this.binaryLogClient, this.mySqlConnection);
        SnapshotSplitReader snapshotSplitReader = new SnapshotSplitReader(statefulTaskContext, 0);
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            MySqlSplit mySqlSplit = list.get(i3);
            if (snapshotSplitReader.isFinished()) {
                snapshotSplitReader.submitSplit(mySqlSplit);
            }
            while (true) {
                Iterator pollSplitRecords = snapshotSplitReader.pollSplitRecords();
                if (pollSplitRecords == null) {
                    break;
                }
                while (pollSplitRecords.hasNext()) {
                    Iterator it = ((SourceRecords) pollSplitRecords.next()).iterator();
                    while (it.hasNext()) {
                        arrayList.add((SourceRecord) it.next());
                    }
                }
            }
        }
        snapshotSplitReader.close();
        Assert.assertNotNull(snapshotSplitReader.getExecutorService());
        Assert.assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
        List<FinishedSnapshotSplitInfo> finishedSplitsInfo = getFinishedSplitsInfo(list, arrayList);
        BinlogOffset startingOffsetOfBinlogSplit = RecordUtils.getStartingOffsetOfBinlogSplit(finishedSplitsInfo);
        HashMap hashMap = new HashMap();
        Iterator<MySqlSnapshotSplit> it2 = list.iterator();
        while (it2.hasNext()) {
            hashMap.putAll(it2.next().getTableSchemas());
        }
        MySqlBinlogSplit mySqlBinlogSplit = new MySqlBinlogSplit("binlog-split", startingOffsetOfBinlogSplit, BinlogOffset.ofNonStopping(), finishedSplitsInfo, hashMap, finishedSplitsInfo.size());
        BinlogSplitReader binlogSplitReader = new BinlogSplitReader(statefulTaskContext, 0);
        binlogSplitReader.submitSplit(mySqlBinlogSplit);
        if (tableId.table().contains("customers")) {
            makeCustomersBinlogEvents(statefulTaskContext.getConnection(), tableId.toString(), i == 1);
        } else {
            makeCustomerCardsBinlogEvents(statefulTaskContext.getConnection(), tableId.toString());
        }
        ArrayList arrayList2 = new ArrayList(formatResult(arrayList, dataType));
        while (arrayList2.size() < i2) {
            arrayList2.addAll(formatResult(pollRecordsFromReader(binlogSplitReader, RecordUtils::isDataChangeRecord), dataType));
        }
        binlogSplitReader.close();
        Assert.assertNotNull(snapshotSplitReader.getExecutorService());
        Assert.assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
        return arrayList2;
    }

    private void updateCustomersTableInBulk(JdbcConnection jdbcConnection, String str) throws Exception {
        jdbcConnection.setAutoCommit(false);
        jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 101 OR id = 102", "UPDATE " + str + " SET address = 'Pittsburgh' where id = 103 OR id = 109"});
        jdbcConnection.commit();
    }

    private void makeCustomersBinlogEvents(JdbcConnection jdbcConnection, String str, boolean z) throws SQLException {
        jdbcConnection.setAutoCommit(false);
        jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + str + " where id = 102", "INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + str + " SET address = 'Shanghai' where id = 103"});
        jdbcConnection.commit();
        if (z) {
            return;
        }
        jdbcConnection.execute(new String[]{"UPDATE " + str + " SET name = 'Hangzhou' where id = 1010"});
        jdbcConnection.commit();
        jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(2001, 'user_22','Shanghai','123567891234')"});
        jdbcConnection.commit();
        jdbcConnection.execute(new String[]{"ALTER TABLE " + str + " ADD COLUMN email VARCHAR(128) DEFAULT 'user@flink.apache.org'"});
        jdbcConnection.commit();
        jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(2002, 'user_23','Shanghai','123567891234', 'test1@gmail.com')"});
        jdbcConnection.commit();
        jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(2003, 'user_24','Shanghai','123567891234', 'test2@gmail.com')"});
        jdbcConnection.commit();
    }

    private void makeCustomerCardsBinlogEvents(JdbcConnection jdbcConnection, String str) throws SQLException {
        if (str.endsWith("customer_card_single_line")) {
            jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(20000, 'LEVEL_1', 'user_1', 'user with level 1')"});
            jdbcConnection.commit();
            jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(20001, 'LEVEL_2', 'user_2', 'user with level 2')", "INSERT INTO " + str + " VALUES(20002, 'LEVEL_3', 'user_3', 'user with level 3')"});
            jdbcConnection.commit();
            return;
        }
        jdbcConnection.execute(new String[]{"UPDATE " + str + " SET level = 'LEVEL_3' where user_id = 'user_1'", "INSERT INTO " + str + " VALUES(20002, 'LEVEL_5', 'user_15', 'user with level 15'"});
        jdbcConnection.commit();
        jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(40000, 'LEVEL_1', 'user_16', 'user with level 1')", "INSERT INTO " + str + " VALUES(40004, 'LEVEL_2', 'user_17', 'user with level 2')"});
        jdbcConnection.commit();
        jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(50004, 'LEVEL_1', 'user_18', 'user with level 1')", "INSERT INTO " + str + " VALUES(50005, 'LEVEL_2', 'user_19', 'user with level 2')"});
        jdbcConnection.commit();
    }

    private List<FinishedSnapshotSplitInfo> getFinishedSplitsInfo(List<MySqlSnapshotSplit> list, List<SourceRecord> list2) {
        HashMap hashMap = new HashMap();
        list.forEach(mySqlSnapshotSplit -> {
            hashMap.put(mySqlSnapshotSplit.splitId(), mySqlSnapshotSplit);
        });
        ArrayList arrayList = new ArrayList();
        list2.stream().filter(RecordUtils::isHighWatermarkEvent).forEach(sourceRecord -> {
            arrayList.add(RecordUtils.getSnapshotSplitInfo((MySqlSnapshotSplit) hashMap.get(((Struct) sourceRecord.value()).getString("split_id")), sourceRecord));
        });
        return arrayList;
    }

    private List<String> formatResult(List<SourceRecord> list, DataType dataType) {
        return new RecordsFormatter(dataType).format(list);
    }

    private List<MySqlSnapshotSplit> getMySqlSplits(String[] strArr, MySqlSourceConfig mySqlSourceConfig) {
        MySqlSnapshotSplitAssigner mySqlSnapshotSplitAssigner = new MySqlSnapshotSplitAssigner(mySqlSourceConfig, 4, (List) ((List) Arrays.stream(strArr).map(str -> {
            return this.customerDatabase.getDatabaseName() + "." + str;
        }).collect(Collectors.toList())).stream().map(TableId::parse).collect(Collectors.toList()), false);
        mySqlSnapshotSplitAssigner.open();
        ArrayList arrayList = new ArrayList();
        while (true) {
            Optional next = mySqlSnapshotSplitAssigner.getNext();
            if (!next.isPresent()) {
                mySqlSnapshotSplitAssigner.close();
                return arrayList;
            }
            arrayList.add(((MySqlSplit) next.get()).asSnapshotSplit());
        }
    }

    private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] strArr) {
        return getConfig(MYSQL_CONTAINER, this.customerDatabase, startupOptions, strArr);
    }

    private MySqlSourceConfig getConfig(MySqlContainer mySqlContainer, UniqueDatabase uniqueDatabase, StartupOptions startupOptions, String[] strArr) {
        return getConfigFactory(mySqlContainer, uniqueDatabase, strArr).startupOptions(startupOptions).createConfig(0);
    }

    private MySqlSourceConfig getConfig(String[] strArr) {
        return getConfig(MYSQL_CONTAINER, this.customerDatabase, strArr);
    }

    private MySqlSourceConfig getConfig(MySqlContainer mySqlContainer, UniqueDatabase uniqueDatabase, String[] strArr) {
        return getConfigFactory(mySqlContainer, uniqueDatabase, strArr).createConfig(0);
    }

    private MySqlSourceConfigFactory getConfigFactory(MySqlContainer mySqlContainer, UniqueDatabase uniqueDatabase, String[] strArr) {
        return new MySqlSourceConfigFactory().databaseList(new String[]{uniqueDatabase.getDatabaseName()}).tableList((String[]) Arrays.stream(strArr).map(str -> {
            return uniqueDatabase.getDatabaseName() + "." + str;
        }).toArray(i -> {
            return new String[i];
        })).hostname(mySqlContainer.getHost()).port(mySqlContainer.getDatabasePort()).username(uniqueDatabase.getUsername()).splitSize(4).fetchSize(2).password(uniqueDatabase.getPassword());
    }

    private void addColumnToTable(JdbcConnection jdbcConnection, String str) throws Exception {
        jdbcConnection.execute(new String[]{"ALTER TABLE " + str + " ADD COLUMN new_int_column INT DEFAULT 15213"});
        jdbcConnection.commit();
    }
}
