package com.ververica.cdc.connectors.mysql.table;

import com.ververica.cdc.connectors.mysql.LegacyMySqlSourceTest;
import com.ververica.cdc.connectors.mysql.MySqlTestUtils;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.lifecycle.Startables;

@RunWith(Parameterized.class)
/* loaded from: input_file:com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.class */
public class MySqlConnectorITCase extends MySqlSourceTestBase {
    private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase fullTypesMySql57Database = new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase fullTypesMySql8Database = new UniqueDatabase(MYSQL8_CONTAINER, "column_type_test_mysql8", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase userDatabase1 = new UniqueDatabase(MYSQL_CONTAINER, "user_1", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase userDatabase2 = new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase inventoryDatabase8 = new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
    private final UniqueDatabase binlogDatabase = new UniqueDatabase(MYSQL8_CONTAINER, "binlog_metadata_test", TEST_USER, TEST_PASSWORD);
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());
    private final boolean incrementalSnapshot;
    private static final Logger LOG = LoggerFactory.getLogger(MySqlConnectorITCase.class);
    private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
    private static final String TEST_USER = "mysqluser";
    private static final String TEST_PASSWORD = "mysqlpw";
    private static final UniqueDatabase customer3_0Database = new UniqueDatabase(MYSQL_CONTAINER, "customer3.0", TEST_USER, TEST_PASSWORD);

    public MySqlConnectorITCase(boolean z) {
        this.incrementalSnapshot = z;
    }

    @Parameterized.Parameters(name = "incrementalSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{false}, new Object[]{true}};
    }

    @BeforeClass
    public static void beforeClass() {
        LOG.info("Starting MySql8 containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Container MySql8 is started.");
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MySql8 containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Container MySql8 is stopped.");
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        if (!this.incrementalSnapshot) {
            this.env.setParallelism(1);
        } else {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        }
    }

    @Test
    public void testConsumingAllEvents() throws Exception {
        runConsumingAllEventsTest("");
    }

    @Test
    public void testConsumingAllEventsUseSSL() throws Exception {
        runConsumingAllEventsTest(", 'jdbc.properties.useSSL'= 'true', 'jdbc.properties.requireSSL'= 'true', 'jdbc.properties.verifyServerCerticate'= 'false'");
    }

    private void runConsumingAllEventsTest(String str) throws Exception {
        this.inventoryDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( `id` INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s' %s)", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize()), str));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
        waitForSnapshotStarted("sink");
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
                createStatement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
                createStatement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                createStatement.execute("DELETE FROM products WHERE id=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", 20);
                assertEqualsInAnyOrder(Arrays.asList("+I[scooter, 3.140]", "+I[car battery, 8.100]", "+I[12-pack drill bits, 0.800]", "+I[hammer, 2.625]", "+I[rocks, 5.100]", "+I[jacket, 0.600]", "+I[spare tire, 22.200]"), TestValuesTableFactory.getResults("sink"));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testNoPKTableWithChunkKey() throws Exception {
        runConsumingForNoPKTableTest(", 'scan.incremental.snapshot.chunk.key-column'='type'");
    }

    @Test
    public void testNoPKTableWithoutChunkKey() {
        Optional findThrowable = ExceptionUtils.findThrowable(Assert.assertThrows(Throwable.class, () -> {
            runConsumingForNoPKTableTest("");
        }), ValidationException.class);
        Assert.assertTrue(findThrowable.isPresent());
        Assert.assertEquals("'scan.incremental.snapshot.chunk.key-column' is required for table without primary key when 'scan.incremental.snapshot.enabled' enabled.", ((ValidationException) findThrowable.get()).getCause().getMessage());
    }

    private void runConsumingForNoPKTableTest(String str) throws Exception {
        this.inventoryDatabase.createAndInitialize();
        String format = String.format("CREATE TABLE debezium_source ( `type` INT, name STRING, description STRING, weight DECIMAL(10,3)) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = 'products_no_pk', 'scan.incremental.snapshot.enabled' = 'true', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '2' %s)", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), getServerId(), str);
        String str2 = "CREATE TABLE sink ( `type` INT, name STRING, description STRING, weight DECIMAL(10,3)" + (this.incrementalSnapshot ? ", PRIMARY KEY (`type`) NOT ENFORCED" : "") + ") WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(format);
        this.tEnv.executeSql(str2);
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        waitForSinkSize("sink", 11);
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE products_no_pk SET description='18oz carpenter hammer' WHERE type=103;");
                createStatement.execute("UPDATE products_no_pk SET weight='5.1' WHERE type=106;");
                createStatement.execute("INSERT INTO products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("DELETE FROM products_no_pk WHERE type=102;");
                createStatement.execute("INSERT INTO products_no_pk VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE type=110;");
                createStatement.execute("UPDATE products_no_pk SET weight='5.17' WHERE type=111;");
                createStatement.execute("DELETE FROM products_no_pk WHERE type=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", this.incrementalSnapshot ? 25 : 29);
                assertEqualsInAnyOrder(Arrays.asList(this.incrementalSnapshot ? new String[]{"+I[100, scooter, Small 2-wheel scooter, 3.140]", "+I[101, car battery, 12V car battery, 8.100]", "+I[103, hammer, 18oz carpenter hammer, 1.000]", "+I[104, rocks, box of assorted rocks, 5.300]", "+I[105, jacket, water resistent black wind breaker, 0.100]", "+I[106, spare tire, 24 inch spare tire, 5.100]", "+I[110, jacket, new water resistent white wind breaker, 0.500]"} : new String[]{"+I[100, scooter, Small 2-wheel scooter, 3.140]", "+I[101, car battery, 12V car battery, 8.100]", "+I[103, hammer, 18oz carpenter hammer, 0.750]", "+I[103, hammer, 18oz carpenter hammer, 0.875]", "+I[103, hammer, 18oz carpenter hammer, 1.000]", "+I[104, rocks, box of assorted rocks, 5.300]", "+I[104, rocks, box of assorted rocks, 5.300]", "+I[104, rocks, box of assorted rocks, 5.300]", "+I[105, jacket, water resistent black wind breaker, 0.100]", "+I[106, spare tire, 24 inch spare tire, 5.100]", "+I[110, jacket, new water resistent white wind breaker, 0.500]"}), TestValuesTableFactory.getResults("sink"));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCheckpointIsOptionalUnderSingleParallelism() throws Exception {
        if (this.incrementalSnapshot) {
            this.env.setParallelism(1);
            this.env.getCheckpointConfig().disableCheckpointing();
            this.inventoryDatabase.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( `id` INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT * FROM debezium_source");
            CloseableIterator collect = executeSql.collect();
            String[] strArr = {"+I[101, scooter, Small 2-wheel scooter, 3.140]", "+I[102, car battery, 12V car battery, 8.100]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[104, hammer, 12oz carpenter's hammer, 0.750]", "+I[105, hammer, 14oz carpenter's hammer, 0.875]", "+I[106, hammer, 16oz carpenter's hammer, 1.000]", "+I[107, rocks, box of assorted rocks, 5.300]", "+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[109, spare tire, 24 inch spare tire, 22.200]"};
            assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
            Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                    createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                    createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                    createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                    createStatement.execute("DELETE FROM products WHERE id=111;");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    String[] strArr2 = {"+I[110, jacket, water resistent white wind breaker, 0.200]", "+I[111, scooter, Big 2-wheel scooter , 5.180]", "-U[110, jacket, water resistent white wind breaker, 0.200]", "+U[110, jacket, new water resistent white wind breaker, 0.500]", "-U[111, scooter, Big 2-wheel scooter , 5.180]", "+U[111, scooter, Big 2-wheel scooter , 5.170]", "-D[111, scooter, Big 2-wheel scooter , 5.170]"};
                    assertEqualsInOrder(Arrays.asList(strArr2), fetchRows(collect, strArr2.length));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th) {
                if (jdbcConnection != null) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    public void testMysql57AllDataTypes() throws Throwable {
        testAllDataTypes(MYSQL_CONTAINER, this.fullTypesMySql57Database);
    }

    @Test
    public void testMySql8AllDataTypes() throws Throwable {
        testAllDataTypes(MYSQL8_CONTAINER, this.fullTypesMySql8Database);
    }

    public void testAllDataTypes(MySqlContainer mySqlContainer, UniqueDatabase uniqueDatabase) throws Throwable {
        uniqueDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE full_types (\n    `id` INT NOT NULL,\n    tiny_c TINYINT,\n    tiny_un_c SMALLINT ,\n    tiny_un_z_c SMALLINT ,\n    small_c SMALLINT,\n    small_un_c INT,\n    small_un_z_c INT,\n    medium_c INT,\n    medium_un_c INT,\n    medium_un_z_c BIGINT,\n    int_c INT ,\n    int_un_c BIGINT,\n    int_un_z_c BIGINT,\n    int11_c BIGINT,\n    big_c BIGINT,\n    big_un_c DECIMAL(20, 0),\n    big_un_z_c DECIMAL(20, 0),\n    varchar_c VARCHAR(255),\n    char_c CHAR(3),\n    real_c FLOAT,\n    float_c FLOAT,\n    float_un_c FLOAT,\n    float_un_z_c FLOAT,\n    double_c DOUBLE,\n    double_un_c DOUBLE,\n    double_un_z_c DOUBLE,\n    decimal_c DECIMAL(8, 4),\n    decimal_un_c DECIMAL(8, 4),\n    decimal_un_z_c DECIMAL(8, 4),\n    numeric_c DECIMAL(6, 0),\n    big_decimal_c STRING,\n    bit1_c BOOLEAN,\n    tiny1_c BOOLEAN,\n    boolean_c BOOLEAN,\n    date_c DATE,\n    time_c TIME(0),\n    datetime3_c TIMESTAMP(3),\n    datetime6_c TIMESTAMP(6),\n    timestamp_c TIMESTAMP(0),\n    file_uuid BYTES,\n    bit_c BINARY(8),\n    text_c STRING,\n    tiny_blob_c BYTES,\n    blob_c BYTES,\n    medium_blob_c BYTES,\n    long_blob_c BYTES,\n    year_c INT,\n    enum_c STRING,\n    set_c ARRAY<STRING>,\n    json_c STRING,\n    point_c STRING,\n    geometry_c STRING,\n    linestring_c STRING,\n    polygon_c STRING,\n    multipoint_c STRING,\n    multiline_c STRING,\n    multipolygon_c STRING,\n    geometrycollection_c STRING,\n    primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", mySqlContainer.getHost(), Integer.valueOf(mySqlContainer.getDatabasePort()), uniqueDatabase.getUsername(), uniqueDatabase.getPassword(), uniqueDatabase.getDatabaseName(), "full_types", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
        TableResult executeSql = this.tEnv.executeSql("SELECT id,\ntiny_c,\ntiny_un_c,\ntiny_un_z_c,\nsmall_c,\nsmall_un_c,\nsmall_un_z_c,\nmedium_c, \nmedium_un_c, \nmedium_un_z_c, \nint_c,\nint_un_c,\nint_un_z_c,\nint11_c,\nbig_c,\nbig_un_c, \nbig_un_z_c, \nvarchar_c,\nchar_c,\nreal_c, \nfloat_c,\nfloat_un_c,\nfloat_un_z_c,\ndouble_c,\ndouble_un_c,\ndouble_un_z_c,\ndecimal_c,\ndecimal_un_c,\ndecimal_un_z_c,\nnumeric_c,\nbig_decimal_c,\nbit1_c,\ntiny1_c,\nboolean_c,\ndate_c,\ntime_c,\ndatetime3_c,\ndatetime6_c,\ntimestamp_c,\nTO_BASE64(DECODE(file_uuid, 'UTF-8')),\nbit_c,\ntext_c,\ntiny_blob_c,\nblob_c,\nmedium_blob_c,\nlong_blob_c,\nyear_c,\nenum_c,\nset_c,\njson_c, \npoint_c, \ngeometry_c, \nlinestring_c, \npolygon_c, \nmultipoint_c, \nmultiline_c, \nmultipolygon_c, \ngeometrycollection_c \n FROM full_types");
        CloseableIterator collect = executeSql.collect();
        waitForSnapshotStarted((CloseableIterator<Row>) collect);
        Connection jdbcConnection = uniqueDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                String[] strArr = {"+I[1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647, 4294967295, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, 18446744073709551615, Hello World, abc, 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, 123.4567, 123.4568, 123.4569, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=, [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}]", "-U[1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647, 4294967295, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, 18446744073709551615, Hello World, abc, 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, 123.4567, 123.4568, 123.4569, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=, [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\":\"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}]", "+U[1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647, 4294967295, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, 18446744073709551615, Hello World, abc, 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, 123.4567, 123.4568, 123.4569, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=, [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\":\"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}]"};
                assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testWideTable() throws Exception {
        this.fullTypesMySql57Database.createAndInitialize();
        Connection jdbcConnection = this.fullTypesMySql57Database.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute(String.format("USE %s", this.fullTypesMySql57Database.getDatabaseName()));
                createStatement.execute("CREATE TABLE wide_table(" + buildColumnsDDL("col", 0, 500, "BIGINT") + " PRIMARY KEY (col0) )");
                createStatement.execute("INSERT INTO wide_table values(" + getIntegerSeqString(0, 500) + ")");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                this.tEnv.executeSql(String.format("CREATE TABLE wide_table (\n" + buildColumnsDDL("col", 0, 500, "BIGINT") + "    primary key (`col0`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.fullTypesMySql57Database.getUsername(), this.fullTypesMySql57Database.getPassword(), this.fullTypesMySql57Database.getDatabaseName(), "wide_table", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
                TableResult executeSql = this.tEnv.executeSql("SELECT * FROM wide_table");
                CloseableIterator collect = executeSql.collect();
                waitForSnapshotStarted((CloseableIterator<Row>) collect);
                jdbcConnection = this.fullTypesMySql57Database.getJdbcConnection();
                try {
                    createStatement = jdbcConnection.createStatement();
                    try {
                        createStatement.execute("UPDATE wide_table SET col1 = 1024 WHERE col0=0;");
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        if (jdbcConnection != null) {
                            jdbcConnection.close();
                        }
                        String[] strArr = {"+I[0, 1, " + getIntegerSeqString(2, 500) + "]", "-U[0, 1, " + getIntegerSeqString(2, 500) + "]", "+U[0, 1024, " + getIntegerSeqString(2, 500) + "]"};
                        assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                        ((JobClient) executeSql.getJobClient().get()).cancel().get();
                    } finally {
                        if (createStatement != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testBigTableWithHugeSplits() throws Exception {
        if (this.incrementalSnapshot) {
            this.fullTypesMySql57Database.createAndInitialize();
            Connection jdbcConnection = this.fullTypesMySql57Database.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute(String.format("USE %s", this.fullTypesMySql57Database.getDatabaseName()));
                    createStatement.execute("CREATE TABLE big_table1(id BIGINT, str VARCHAR(100), PRIMARY KEY (id))");
                    createStatement.execute("CREATE TABLE big_table2(id BIGINT, str VARCHAR(100), PRIMARY KEY (id))");
                    for (int i = 0; i < 10; i++) {
                        createStatement.execute("INSERT INTO big_table1 values(" + i + "," + (i + 100000) + ")");
                        createStatement.execute("INSERT INTO big_table2 values(" + i + "," + (i + 200000) + ")");
                    }
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    this.tEnv.executeSql(String.format("CREATE TABLE big_table (\n    id BIGINT,    str STRING,    primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = 'big_table.*', 'scan.incremental.snapshot.chunk.size' = '2', 'chunk-meta.group.size' = '3', 'server-time-zone' = 'UTC', 'server-id' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.fullTypesMySql57Database.getUsername(), this.fullTypesMySql57Database.getPassword(), this.fullTypesMySql57Database.getDatabaseName(), getServerId()));
                    this.tEnv.executeSql("CREATE TABLE sink ( `id` BIGINT NOT NULL, str STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
                    TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM big_table");
                    waitForSinkSize("sink", 20);
                    jdbcConnection = this.fullTypesMySql57Database.getJdbcConnection();
                    try {
                        createStatement = jdbcConnection.createStatement();
                        try {
                            createStatement.execute("UPDATE big_table1 SET str = '1024' WHERE id=0;");
                            createStatement.execute("UPDATE big_table1 SET str = '1025' WHERE id=1;");
                            createStatement.execute("UPDATE big_table2 SET str = '2048' WHERE id=2;");
                            createStatement.execute("UPDATE big_table2 SET str = '2049' WHERE id=3;");
                            if (createStatement != null) {
                                createStatement.close();
                            }
                            if (jdbcConnection != null) {
                                jdbcConnection.close();
                            }
                            waitForSinkSize("sink", 24);
                            ArrayList arrayList = new ArrayList();
                            for (int i2 = 0; i2 < 10; i2++) {
                                arrayList.add("+I[" + i2 + ", " + (i2 + 100000) + "]");
                                arrayList.add("+I[" + i2 + ", " + (i2 + 200000) + "]");
                            }
                            arrayList.addAll(Lists.newArrayList(new String[]{"+U[0, 1024]", "+U[1, 1025]", "+U[2, 2048]", "+U[3, 2049]"}));
                            List rawResults = TestValuesTableFactory.getRawResults("sink");
                            Collections.sort(rawResults);
                            Collections.sort(arrayList);
                            Assert.assertEquals(arrayList, rawResults);
                            ((JobClient) executeSql.getJobClient().get()).cancel().get();
                        } finally {
                            if (createStatement != null) {
                                try {
                                    createStatement.close();
                                } catch (Throwable th) {
                                    th.addSuppressed(th);
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Test
    public void testMetadataColumns() throws Exception {
        this.userDatabase1.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE mysql_users ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA VIRTUAL, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, age INT, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.userDatabase1.getUsername(), this.userDatabase1.getPassword(), this.userDatabase1.getDatabaseName(), "user_table_.*", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
        this.tEnv.executeSql("CREATE TABLE sink ( database_name STRING, table_name STRING, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, age INT, primary key (database_name, table_name, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM mysql_users");
        waitForSinkSize("sink", 2);
        Connection jdbcConnection = this.userDatabase1.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("INSERT INTO user_table_1_2 VALUES (200,'user_200','Wuhan',123567891234);");
                createStatement.execute("INSERT INTO user_table_1_1 VALUES (300,'user_300','Hangzhou',123567891234, 'user_300@foo.com');");
                createStatement.execute("UPDATE user_table_1_1 SET address='Beijing' WHERE id=300;");
                createStatement.execute("UPDATE user_table_1_2 SET phone_number=88888888 WHERE id=121;");
                createStatement.execute("DELETE FROM user_table_1_1 WHERE id=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", 7);
                List list = (List) Stream.of((Object[]) new String[]{"+I[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "+I[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]", "+I[%s, user_table_1_2, 200, user_200, Wuhan, 123567891234, null, null]", "+I[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]", "+U[%s, user_table_1_1, 300, user_300, Beijing, 123567891234, user_300@foo.com, null]", "+U[%s, user_table_1_2, 121, user_121, Shanghai, 88888888, null, null]", "-D[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "-U[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]", "-U[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]"}).map(str -> {
                    return String.format(str, this.userDatabase1.getDatabaseName());
                }).sorted().collect(Collectors.toList());
                List rawResults = TestValuesTableFactory.getRawResults("sink");
                Collections.sort(rawResults);
                Assert.assertEquals(list, rawResults);
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStartupFromLatestOffset() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key(id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.startup.mode' = 'latest-offset', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", Boolean.valueOf(this.incrementalSnapshot), getServerId()));
        TableResult executeSql = this.tEnv.executeSql("SELECT * FROM debezium_source");
        do {
            Thread.sleep(5000L);
        } while (((JobClient) executeSql.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        CloseableIterator collect = executeSql.collect();
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                createStatement.execute("DELETE FROM products WHERE id=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                String[] strArr = {"+I[110, jacket, water resistent white wind breaker, 0.200]", "+I[111, scooter, Big 2-wheel scooter , 5.180]", "-U[110, jacket, water resistent white wind breaker, 0.200]", "+U[110, jacket, new water resistent white wind breaker, 0.500]", "-U[111, scooter, Big 2-wheel scooter , 5.180]", "+U[111, scooter, Big 2-wheel scooter , 5.170]", "-D[111, scooter, Big 2-wheel scooter , 5.170]"};
                assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testPrimaryKeyWithVarbinaryType() throws Exception {
        if (this.incrementalSnapshot) {
            this.inventoryDatabase.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE varbinary_pk_table ( order_id VARBINARY(11), order_date DATE, quantity INT, product_id INT, purchaser STRING, PRIMARY KEY(order_id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "varbinary_pk_table", getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT * FROM varbinary_pk_table");
            do {
                Thread.sleep(5000L);
            } while (((JobClient) executeSql.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
            CloseableIterator collect = executeSql.collect();
            Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("INSERT INTO varbinary_pk_table VALUES (b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', 30, 500, 'flink');");
                    createStatement.execute("INSERT INTO varbinary_pk_table VALUES (b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', 30, 500, 'flink-sql');");
                    createStatement.execute("UPDATE varbinary_pk_table SET quantity=50 WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000101';");
                    createStatement.execute("DELETE FROM varbinary_pk_table WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000110';");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    String[] strArr = {"+I[[4, 4, 4, 4, 4, 4, 4, 0], 2021-03-08, 0, 0, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 1], 2021-03-08, 10, 100, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 2], 2021-03-08, 20, 200, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 3], 2021-03-08, 30, 300, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 4], 2021-03-08, 40, 400, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]", "-U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]", "+U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 50, 500, flink]", "-D[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]"};
                    assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th) {
                if (jdbcConnection != null) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    public void testPrimaryKeyWithSnowflakeAlgorithm() throws Exception {
        this.customerDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE address ( `id` DECIMAL(20, 0) NOT NULL, country STRING, city STRING, detail_address STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "address", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
        TableResult executeSql = this.tEnv.executeSql("SELECT id,\ncountry,\ncity,\ndetail_address FROM address");
        CloseableIterator collect = executeSql.collect();
        waitForSnapshotStarted((CloseableIterator<Row>) collect);
        Connection jdbcConnection = this.customerDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE address SET city = 'Hangzhou' WHERE id=416927583791428523;");
                createStatement.execute("INSERT INTO address VALUES(418257940021724075, 'Germany', 'Berlin', 'West Town address 3')");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                String[] strArr = {"+I[417271541558096811, America, New York, East Town address 2]", "+I[417272886855938987, America, New York, East Town address 3]", "+I[417111867899200427, America, New York, East Town address 1]", "+I[417420106184475563, Germany, Berlin, West Town address 1]", "+I[418161258277847979, Germany, Berlin, West Town address 2]", "+I[416874195632735147, China, Beijing, West Town address 1]", "+I[416927583791428523, China, Beijing, West Town address 2]", "+I[417022095255614379, China, Beijing, West Town address 3]", "-U[416927583791428523, China, Beijing, West Town address 2]", "+U[416927583791428523, China, Hangzhou, West Town address 2]", "+I[418257940021724075, Germany, Berlin, West Town address 3]"};
                assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testReadingWithDotTableName() throws Exception {
        if (this.incrementalSnapshot) {
            customer3_0Database.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE customers ( `id` INTEGER NOT NULL, name STRING, address STRING, phone_number STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), customer3_0Database.getUsername(), customer3_0Database.getPassword(), customer3_0Database.getDatabaseName(), "customers3.0", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT id,\nname,\naddress,\nphone_number FROM customers");
            CloseableIterator collect = executeSql.collect();
            waitForSnapshotStarted((CloseableIterator<Row>) collect);
            Connection jdbcConnection = customer3_0Database.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("UPDATE `customers3.0` SET address = 'Hangzhou' WHERE id=103;");
                    createStatement.execute("INSERT INTO `customers3.0` VALUES(110, 'newCustomer', 'Berlin', '12345678')");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    String[] strArr = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[104, user_4, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "+I[110, newCustomer, Berlin, 12345678]"};
                    assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                    customer3_0Database.dropDatabase();
                } finally {
                }
            } catch (Throwable th) {
                if (jdbcConnection != null) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    public void testReadingWithRegexPattern() throws Exception {
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        this.customerDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE customers ( `id` INTEGER NOT NULL, name STRING, address STRING, phone_number STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), String.format("%s.*", this.customerDatabase.getDatabaseName()), "customers", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
        TableResult executeSql = this.tEnv.executeSql("SELECT * FROM customers");
        CloseableIterator collect = executeSql.collect();
        waitForSnapshotStarted((CloseableIterator<Row>) collect);
        String[] strArr = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    @Test
    public void testDdlWithDefaultStringValue() throws Exception {
        if (this.incrementalSnapshot) {
            this.env.setRestartStrategy(RestartStrategies.noRestart());
            this.customerDatabase.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE default_value_test ( id BIGINT NOT NULL, name STRING, address STRING, phone_number BIGINT, primary key (id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "default_value_test.*", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT * FROM default_value_test");
            JobClient jobClient = (JobClient) executeSql.getJobClient().get();
            MySqlTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow(Duration.ofSeconds(10L)));
            CloseableIterator collect = executeSql.collect();
            waitForSnapshotStarted((CloseableIterator<Row>) collect);
            Connection jdbcConnection = this.customerDatabase.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("DELETE FROM default_value_test WHERE id=1;");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    String[] strArr = {"+I[1, user1, Shanghai, 123567]", "+I[2, user2, Shanghai, 123567]", "-D[1, user1, Shanghai, 123567]"};
                    jdbcConnection = this.customerDatabase.getJdbcConnection();
                    try {
                        createStatement = jdbcConnection.createStatement();
                        try {
                            createStatement.execute(" CREATE TABLE temp_default_value_test (\n     id INTEGER NOT NULL PRIMARY KEY, \n     tiny_c TINYINT DEFAULT ' 0 ', \n     boolean_c BOOLEAN DEFAULT ' 1 ', \n     tiny_un_z_c TINYINT UNSIGNED ZEROFILL DEFAULT ' 2 ', \n     small_c SMALLINT DEFAULT ' 3 ', \n     small_un_c SMALLINT UNSIGNED DEFAULT ' 4 ',\n     small_un_z_c SMALLINT UNSIGNED ZEROFILL DEFAULT ' 5 ', \n     medium_c MEDIUMINT DEFAULT ' 6 ', \n     medium_un_c MEDIUMINT UNSIGNED DEFAULT ' 7 ', \n     medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL DEFAULT ' 8 ', \n     int_c INTEGER DEFAULT ' 9 ', \n     int_un_c INTEGER UNSIGNED DEFAULT ' 10 ', \n     int_un_z_c INTEGER UNSIGNED ZEROFILL DEFAULT ' 11 ',\n     int11_c INT(11) DEFAULT ' 12 ', \n     big_c BIGINT DEFAULT ' 13 ', \n     big_un_c BIGINT UNSIGNED DEFAULT ' 14 ', \n     big_un_z_c BIGINT UNSIGNED ZEROFILL DEFAULT ' 15 ', \n     decimal_c DECIMAL(8, 4) DEFAULT ' 16  ', \n     decimal_un_c DECIMAL(8, 4) UNSIGNED DEFAULT ' 17 ', \n     decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL DEFAULT ' 18 ', \n     numeric_c NUMERIC(6, 0) DEFAULT ' 19 ', \n     big_decimal_c DECIMAL(65, 1) DEFAULT ' 20 ',\n     real_c REAL DEFAULT ' 21.0',\n     float_c FLOAT DEFAULT ' 22.0',\n     float_un_c FLOAT UNSIGNED DEFAULT ' 23',\n     float_un_z_c FLOAT UNSIGNED ZEROFILL DEFAULT ' 24',\n     double_c DOUBLE DEFAULT ' 25',\n     double_un_c DOUBLE UNSIGNED DEFAULT ' 26',\n     double_un_z_c DOUBLE UNSIGNED ZEROFILL DEFAULT ' 27',\n     tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 ' );");
                            createStatement.execute("alter table temp_default_value_test alter column `small_c` SET DEFAULT ' 29 ';");
                            createStatement.execute("alter table temp_default_value_test add column\n    `new_col` smallint(1) unsigned DEFAULT ' 30 ';");
                            createStatement.execute("alter table default_value_test add column\n    `new_col` smallint(1) unsigned DEFAULT ' 31 ';");
                            createStatement.execute(" CREATE TABLE default_value_test_ignore (\n     id INTEGER NOT NULL PRIMARY KEY, \n     tiny_c TINYINT DEFAULT ' 0 ', \n     boolean_c BOOLEAN DEFAULT ' 1 ', \n     tiny_un_z_c TINYINT UNSIGNED ZEROFILL DEFAULT ' 2 ', \n     small_c SMALLINT DEFAULT ' 3 ', \n     small_un_c SMALLINT UNSIGNED DEFAULT ' 4 ',\n     small_un_z_c SMALLINT UNSIGNED ZEROFILL DEFAULT ' 5 ', \n     medium_c MEDIUMINT DEFAULT ' 6 ', \n     medium_un_c MEDIUMINT UNSIGNED DEFAULT ' 7 ', \n     medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL DEFAULT ' 8 ', \n     int_c INTEGER DEFAULT ' 9 ', \n     int_un_c INTEGER UNSIGNED DEFAULT ' 10 ', \n     int_un_z_c INTEGER UNSIGNED ZEROFILL DEFAULT ' 11 ',\n     int11_c INT(11) DEFAULT ' 12 ', \n     big_c BIGINT DEFAULT ' 13 ', \n     big_un_c BIGINT UNSIGNED DEFAULT ' 14 ', \n     big_un_z_c BIGINT UNSIGNED ZEROFILL DEFAULT ' 15 ', \n     decimal_c DECIMAL(8, 4) DEFAULT ' 16  ', \n     decimal_un_c DECIMAL(8, 4) UNSIGNED DEFAULT ' 17 ', \n     decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL DEFAULT ' 18 ', \n     numeric_c NUMERIC(6, 0) DEFAULT ' 19 ', \n     big_decimal_c DECIMAL(65, 1) DEFAULT ' 20 ',\n     real_c REAL DEFAULT ' 21.0',\n     float_c FLOAT DEFAULT ' 22.0',\n     float_un_c FLOAT UNSIGNED DEFAULT ' 23',\n     float_un_z_c FLOAT UNSIGNED ZEROFILL DEFAULT ' 24',\n     double_c DOUBLE DEFAULT ' 25',\n     double_un_c DOUBLE UNSIGNED DEFAULT ' 26',\n     double_un_z_c DOUBLE UNSIGNED ZEROFILL DEFAULT ' 27',\n     tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 ' );");
                            if (createStatement != null) {
                                createStatement.close();
                            }
                            if (jdbcConnection != null) {
                                jdbcConnection.close();
                            }
                            assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                            jobClient.cancel().get();
                        } finally {
                            if (createStatement != null) {
                                try {
                                    createStatement.close();
                                } catch (Throwable th) {
                                    th.addSuppressed(th);
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Test
    public void testAlterWithDefaultStringValue() throws Exception {
        if (this.incrementalSnapshot) {
            this.env.setRestartStrategy(RestartStrategies.noRestart());
            this.customerDatabase.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE default_value_test ( id BIGINT NOT NULL, name STRING, address STRING, phone_number BIGINT, primary key (id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "default_value_test", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT * FROM default_value_test");
            JobClient jobClient = (JobClient) executeSql.getJobClient().get();
            MySqlTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow(Duration.ofSeconds(10L)));
            CloseableIterator collect = executeSql.collect();
            waitForSnapshotStarted((CloseableIterator<Row>) collect);
            Connection jdbcConnection = this.customerDatabase.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("DELETE FROM default_value_test WHERE id=1;");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    String[] strArr = {"+I[1, user1, Shanghai, 123567]", "+I[2, user2, Shanghai, 123567]", "-D[1, user1, Shanghai, 123567]"};
                    jdbcConnection = this.customerDatabase.getJdbcConnection();
                    try {
                        createStatement = jdbcConnection.createStatement();
                        try {
                            createStatement.execute("alter table default_value_test add column `collate_test` INT DEFAULT ' 29 ' COLLATE 'utf8_general_ci';");
                            createStatement.execute("alter table default_value_test add column `int_test` INT DEFAULT ' 30 ';");
                            if (createStatement != null) {
                                createStatement.close();
                            }
                            if (jdbcConnection != null) {
                                jdbcConnection.close();
                            }
                            assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                            jobClient.cancel().get();
                        } finally {
                            if (createStatement != null) {
                                try {
                                    createStatement.close();
                                } catch (Throwable th) {
                                    th.addSuppressed(th);
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Test
    public void testShardingTablesWithInconsistentSchema() throws Exception {
        this.userDatabase1.createAndInitialize();
        this.userDatabase2.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE `user` ( `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, age INT, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.userDatabase1.getUsername(), this.userDatabase1.getPassword(), String.format("(%s|%s)", this.userDatabase1.getDatabaseName(), this.userDatabase2.getDatabaseName()), "user_table_.*", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
        TableResult executeSql = this.tEnv.executeSql("SELECT * FROM `user`");
        CloseableIterator collect = executeSql.collect();
        waitForSnapshotStarted((CloseableIterator<Row>) collect);
        Connection jdbcConnection = this.userDatabase1.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE user_table_1_1 SET email = 'user_111@bar.org' WHERE id=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                jdbcConnection = this.userDatabase2.getJdbcConnection();
                try {
                    createStatement = jdbcConnection.createStatement();
                    try {
                        createStatement.execute("UPDATE user_table_2_2 SET age = 20 WHERE id=221;");
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        if (jdbcConnection != null) {
                            jdbcConnection.close();
                        }
                        String[] strArr = {"+I[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "-U[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "+U[111, user_111, Shanghai, 123567891234, user_111@bar.org, null]", "+I[121, user_121, Shanghai, 123567891234, null, null]", "+I[211, user_211, Shanghai, 123567891234, null, null]", "+I[221, user_221, Shanghai, 123567891234, null, 18]", "-U[221, user_221, Shanghai, 123567891234, null, 18]", "+U[221, user_221, Shanghai, 123567891234, null, 20]"};
                        assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                        ((JobClient) executeSql.getJobClient().get()).cancel().get();
                    } finally {
                        if (createStatement != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testStartupFromSpecificBinlogFilePos() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
                createStatement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                Tuple2<String, Integer> currentMySqlLatestOffset = LegacyMySqlSourceTest.currentMySqlLatestOffset(MYSQL_CONTAINER, this.inventoryDatabase, "products", 9, false);
                this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'specific-offset', 'scan.startup.specific-offset.file' = '%s', 'scan.startup.specific-offset.pos' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", currentMySqlLatestOffset.f0, currentMySqlLatestOffset.f1, Boolean.valueOf(this.incrementalSnapshot)));
                this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
                jdbcConnection = this.inventoryDatabase.getJdbcConnection();
                try {
                    Statement createStatement2 = jdbcConnection.createStatement();
                    try {
                        createStatement2.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                        if (createStatement2 != null) {
                            createStatement2.close();
                        }
                        if (jdbcConnection != null) {
                            jdbcConnection.close();
                        }
                        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
                        Connection jdbcConnection2 = this.inventoryDatabase.getJdbcConnection();
                        try {
                            createStatement = jdbcConnection2.createStatement();
                            try {
                                createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                                createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                                createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                                createStatement.execute("DELETE FROM products WHERE id=111;");
                                if (createStatement != null) {
                                    createStatement.close();
                                }
                                if (jdbcConnection2 != null) {
                                    jdbcConnection2.close();
                                }
                                waitForSinkSize("sink", 5);
                                assertEqualsInAnyOrder(Arrays.asList("+I[110, jacket, new water resistent white wind breaker, 0.500]"), TestValuesTableFactory.getResults("sink"));
                                ((JobClient) executeSql.getJobClient().get()).cancel().get();
                            } finally {
                                if (createStatement != null) {
                                    try {
                                        createStatement.close();
                                    } catch (Throwable th) {
                                        th.addSuppressed(th);
                                    }
                                }
                            }
                        } finally {
                            if (jdbcConnection2 != null) {
                                try {
                                    jdbcConnection2.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        }
                    } finally {
                        if (createStatement2 != null) {
                            try {
                                createStatement2.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testStartupFromSpecificGtidSet() throws Exception {
        if (this.incrementalSnapshot) {
            this.inventoryDatabase.createAndInitialize();
            Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
                    createStatement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
                    BinlogOffset currentBinlogOffset = DebeziumUtils.currentBinlogOffset(DebeziumUtils.createMySqlConnection(new MySqlSourceConfigFactory().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).username(TEST_USER).password(TEST_PASSWORD).databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{"products"}).createConfig(0)));
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'specific-offset', 'scan.startup.specific-offset.gtid-set' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", currentBinlogOffset.getGtidSet(), Boolean.valueOf(this.incrementalSnapshot)));
                    this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
                    jdbcConnection = this.inventoryDatabase.getJdbcConnection();
                    try {
                        Statement createStatement2 = jdbcConnection.createStatement();
                        try {
                            createStatement2.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                            if (createStatement2 != null) {
                                createStatement2.close();
                            }
                            if (jdbcConnection != null) {
                                jdbcConnection.close();
                            }
                            TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
                            Connection jdbcConnection2 = this.inventoryDatabase.getJdbcConnection();
                            try {
                                createStatement = jdbcConnection2.createStatement();
                                try {
                                    createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                                    createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                                    createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                                    createStatement.execute("DELETE FROM products WHERE id=111;");
                                    if (createStatement != null) {
                                        createStatement.close();
                                    }
                                    if (jdbcConnection2 != null) {
                                        jdbcConnection2.close();
                                    }
                                    waitForSinkSize("sink", 5);
                                    assertEqualsInAnyOrder(Arrays.asList("+I[110, jacket, new water resistent white wind breaker, 0.500]"), TestValuesTableFactory.getResults("sink"));
                                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                                } finally {
                                    if (createStatement != null) {
                                        try {
                                            createStatement.close();
                                        } catch (Throwable th) {
                                            th.addSuppressed(th);
                                        }
                                    }
                                }
                            } finally {
                                if (jdbcConnection2 != null) {
                                    try {
                                        jdbcConnection2.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                            }
                        } finally {
                            if (createStatement2 != null) {
                                try {
                                    createStatement2.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        }
    }

    @Test
    public void testStartupFromEarliestOffset() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'earliest-offset', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", Boolean.valueOf(this.incrementalSnapshot)));
        this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
                createStatement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
                createStatement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                createStatement.execute("DELETE FROM products WHERE id=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
                waitForSinkSize("sink", 16);
                assertEqualsInAnyOrder(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140]", "+I[102, car battery, 12V car battery, 8.100]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[104, hammer, 12oz carpenter's hammer, 0.750]", "+I[105, hammer, 14oz carpenter's hammer, 0.875]", "+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[109, spare tire, 24 inch spare tire, 22.200]", "+I[106, hammer, 18oz carpenter hammer, 1.000]", "+I[107, rocks, box of assorted rocks, 5.100]", "+I[110, jacket, new water resistent white wind breaker, 0.500]"), TestValuesTableFactory.getResults("sink"));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStartupFromTimestamp() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        Thread.sleep(5000L);
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", Long.valueOf(System.currentTimeMillis()), Boolean.valueOf(this.incrementalSnapshot)));
        this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        Thread.sleep(5000L);
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
                createStatement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
                createStatement.execute("DELETE FROM products WHERE id=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", 5);
                assertEqualsInAnyOrder(Arrays.asList("+I[110, jacket, new water resistent white wind breaker, 0.500]"), TestValuesTableFactory.getResults("sink"));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testColumnOptionalWithDefaultValue() throws Exception {
        this.customerDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( `product_no` DECIMAL(20, 4) NOT NULL, product_kind STRING, user_id STRING, description STRING, primary key (`product_no`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "shopping_cart_dec", Boolean.valueOf(this.incrementalSnapshot), getServerId(), Integer.valueOf(getSplitSize())));
        TableResult executeSql = this.tEnv.executeSql("SELECT product_no,\nproduct_kind,\nuser_id,\ndescription FROM debezium_source");
        CloseableIterator collect = executeSql.collect();
        waitForSnapshotStarted((CloseableIterator<Row>) collect);
        String[] strArr = {"+I[123456.1230, KIND_001, user_1, my shopping cart]", "+I[123457.4560, KIND_002, user_2, my shopping cart]", "+I[123458.6789, KIND_003, user_3, my shopping cart]", "+I[123459.1234, KIND_004, user_4, null]"};
        assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    @Test
    public void testReadingWithMultiMaxValue() throws Exception {
        if (this.incrementalSnapshot) {
            this.inventoryDatabase.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE multi_max_table ( order_id STRING, index INTEGER, desc STRING, PRIMARY KEY(order_id, index) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "multi_max_table", getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT * FROM multi_max_table");
            do {
                Thread.sleep(5000L);
            } while (((JobClient) executeSql.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
            CloseableIterator collect = executeSql.collect();
            String[] strArr = {"+I[, 0, flink]", "+I[, 1, flink]", "+I[, 2, flink]", "+I[a, 0, flink]", "+I[b, 0, flink]", "+I[c, 0, flink]", "+I[d, 0, flink]", "+I[E, 0, flink]", "+I[E, 1, flink]", "+I[E, 2, flink]", "+I[E, 3, flink]", "+I[e, 4, flink]"};
            assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
            ((JobClient) executeSql.getJobClient().get()).cancel().get();
        }
    }

    @Test
    public void testServerIdConflict() {
        try {
            this.env.setRestartStrategy(RestartStrategies.noRestart());
            this.customerDatabase.createAndInitialize();
            for (int i = 0; i < 2; i++) {
                String format = String.format("CREATE TABLE debezium_source%d ( `id` INTEGER NOT NULL, `name` STRING, `address` STRING, `phone_name` STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", Integer.valueOf(i), MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "customers", Boolean.valueOf(this.incrementalSnapshot), getServerId(5400), Integer.valueOf(getSplitSize()));
                String format2 = String.format("CREATE TABLE blackhole_table%d WITH ('connector' = 'blackhole')\n LIKE debezium_source%d (EXCLUDING ALL)", Integer.valueOf(i), Integer.valueOf(i));
                this.tEnv.executeSql(format);
                this.tEnv.executeSql(format2);
            }
            StreamStatementSet createStatementSet = this.tEnv.createStatementSet();
            createStatementSet.addInsertSql("Insert into blackhole_table0 select * from debezium_source0");
            createStatementSet.addInsertSql("Insert into blackhole_table1 select * from debezium_source1");
            createStatementSet.execute().await();
            Assert.fail();
        } catch (Throwable th) {
            MySqlTestUtils.assertContainsErrorMsg(th, "The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\nThe server id conflict may happen in the following situations: \n1. The server id has been used by other mysql cdc table in the current job.\n2. The server id has been used by the mysql cdc table in other jobs.\n3. The server id has been used by other sync tools like canal, debezium and so on.\n");
        }
    }

    @Test
    public void testBinlogTableMetadataDeserialization() throws Exception {
        if (this.incrementalSnapshot) {
            this.binlogDatabase.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE binlog_metadata (\n    id BIGINT NOT NULL,\n    tiny_c TINYINT,\n    tiny_un_c SMALLINT ,\n    tiny_un_z_c SMALLINT ,\n    small_c SMALLINT,\n    small_un_c INT,\n    small_un_z_c INT,\n    year_c INT,\n PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL8_CONTAINER.getHost(), Integer.valueOf(MYSQL8_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.binlogDatabase.getDatabaseName(), "binlog_metadata", getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT * FROM binlog_metadata");
            do {
                Thread.sleep(5000L);
            } while (((JobClient) executeSql.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
            CloseableIterator collect = executeSql.collect();
            Connection jdbcConnection = this.binlogDatabase.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("INSERT INTO binlog_metadata VALUES (2, 127, 255, 255, 32767, 65535, 65535, 2024),(3, 127, 255, 255, 32767, 65535, 65535, 2024);");
                    createStatement.execute("DELETE FROM binlog_metadata WHERE id=3;");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    String[] strArr = {"+I[1, 127, 255, 255, 32767, 65535, 65535, 2023]", "+I[2, 127, 255, 255, 32767, 65535, 65535, 2024]"};
                    assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th) {
                if (jdbcConnection != null) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private String getServerId() {
        int nextInt = new Random().nextInt(100) + 5400;
        return this.incrementalSnapshot ? nextInt + "-" + (nextInt + this.env.getParallelism()) : String.valueOf(nextInt);
    }

    protected String getServerId(int i) {
        return this.incrementalSnapshot ? i + "-" + (i + 4) : String.valueOf(i);
    }

    private int getSplitSize() {
        return this.incrementalSnapshot ? 4 : 0;
    }

    private static String buildColumnsDDL(String str, int i, int i2, String str2) {
        StringBuilder sb = new StringBuilder();
        for (int i3 = i; i3 < i2; i3++) {
            sb.append(str).append(i3).append(" ").append(str2).append(",");
        }
        return sb.toString();
    }

    private static String getIntegerSeqString(int i, int i2) {
        StringBuilder sb = new StringBuilder();
        for (int i3 = i; i3 < i2 - 1; i3++) {
            sb.append(i3).append(", ");
        }
        sb.append(i2 - 1);
        return sb.toString();
    }

    private static void waitForSnapshotStarted(String str) throws InterruptedException {
        while (sinkSize(str) == 0) {
            Thread.sleep(100L);
        }
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> closeableIterator) throws Exception {
        while (!closeableIterator.hasNext()) {
            Thread.sleep(100L);
        }
    }

    @Test
    public void testBinaryHandlingModeWithBase64() throws Exception {
        if (this.incrementalSnapshot) {
            this.inventoryDatabase.createAndInitialize();
            this.tEnv.executeSql(String.format("CREATE TABLE varbinary_base64_table ( id INT, order_id STRING, order_date DATE, quantity INT, product_id INT, purchaser STRING, PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s', 'debezium.binary.handling.mode' = 'base64')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "varbinary_base64_table", getServerId(), Integer.valueOf(getSplitSize())));
            TableResult executeSql = this.tEnv.executeSql("SELECT * FROM varbinary_base64_table");
            do {
                Thread.sleep(5000L);
            } while (((JobClient) executeSql.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
            CloseableIterator collect = executeSql.collect();
            Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
            try {
                Statement createStatement = jdbcConnection.createStatement();
                try {
                    createStatement.execute("INSERT INTO varbinary_base64_table VALUES (6, b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', 30, 500, 'flink');");
                    createStatement.execute("INSERT INTO varbinary_base64_table VALUES (7, b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', 30, 500, 'flink-sql');");
                    createStatement.execute("UPDATE varbinary_base64_table SET quantity=50 WHERE id=6;");
                    createStatement.execute("DELETE FROM varbinary_base64_table WHERE id= 7;");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (jdbcConnection != null) {
                        jdbcConnection.close();
                    }
                    String[] strArr = {"+I[1, BAQEBAQEBAA=, 2021-03-08, 0, 0, flink]", "+I[2, BAQEBAQEBAE=, 2021-03-08, 10, 100, flink]", "+I[3, BAQEBAQEBAI=, 2021-03-08, 20, 200, flink]", "+I[4, BAQEBAQEBAM=, 2021-03-08, 30, 300, flink]", "+I[5, BAQEBAQEBAQ=, 2021-03-08, 40, 400, flink]", "+I[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", "+I[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]", "-U[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", "+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]", "-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]"};
                    assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th) {
                if (jdbcConnection != null) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }
}
