package com.ververica.cdc.connectors.mysql.source;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import java.lang.reflect.Field;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/NewlyAddedTableITCase.class */
public class NewlyAddedTableITCase extends MySqlSourceTestBase {

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);
    private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private final ScheduledExecutorService mockBinlogExecutor = Executors.newScheduledThreadPool(1);

    @Before
    public void before() throws SQLException {
        TestValuesTableFactory.clearAllData();
        this.customDatabase.createAndInitialize();
        MySqlConnection connection = getConnection();
        try {
            connection.setAutoCommit(false);
            String str = this.customDatabase.getDatabaseName() + ".produce_binlog_table";
            connection.execute(new String[]{String.format("CREATE TABLE %s ( id BIGINT PRIMARY KEY, cnt BIGINT);", str)});
            connection.execute(new String[]{String.format("INSERT INTO  %s VALUES (0, 100), (1, 101), (2, 102);", str)});
            connection.commit();
            this.mockBinlogExecutor.schedule(() -> {
                try {
                    connection.execute(new String[]{String.format("UPDATE  %s SET  cnt = cnt +1 WHERE id < 2;", str)});
                    connection.commit();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }, 500L, TimeUnit.MICROSECONDS);
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @After
    public void after() {
        this.mockBinlogExecutor.shutdown();
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
        testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineOnceWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwice() throws Exception {
        testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlogAndAutoCloseReader() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("scan.incremental.close-idle-reader.enabled", "true");
        testNewlyAddedTableOneByOne(4, hashMap, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineThrice() throws Exception {
        testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineThriceWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception {
        testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedTable() throws Exception {
        testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
        testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForRemoveTableSingleParallelism() throws Exception {
        testRemoveTablesOneByOne(1, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testJobManagerFailoverForRemoveTable() throws Exception {
        testRemoveTablesOneByOne(4, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveTableSingleParallelism() throws Exception {
        testRemoveTablesOneByOne(1, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveTable() throws Exception {
        testRemoveTablesOneByOne(4, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveTableSingleParallelism() throws Exception {
        testRemoveTablesOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveTable() throws Exception {
        testRemoveTablesOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveAndAddNewTable() throws Exception {
        String str = this.customDatabase.getDatabaseName() + ".customers_even_dist";
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        String str2 = null;
        CollectResultIterator<RowData> collectResultIterator = null;
        int i = 0;
        while (i < 2) {
            String str3 = i == 0 ? "customers" : "customers_1";
            StreamExecutionEnvironment streamExecutionEnvironment = getStreamExecutionEnvironment(str2, 4);
            RowDataDebeziumDeserializeSchema build = RowDataDebeziumDeserializeSchema.newBuilder().setMetadataConverters(new MetadataConverter[]{MySqlReadableMetadata.TABLE_NAME.getConverter()}).setPhysicalRowType(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())}).getLogicalType()).setResultTypeInfo(InternalTypeInfo.of(TypeConversions.fromDataToLogicalType(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING()), DataTypes.FIELD("_table_name", DataTypes.STRING().notNull())})))).build();
            MySqlSourceBuilder serverTimeZone = MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).serverTimeZone("UTC");
            String[] strArr = new String[2];
            strArr[0] = str;
            strArr[1] = this.customDatabase.getDatabaseName() + "." + (i == 0 ? "customers" : "customers_\\d+");
            DataStreamSource fromSource = streamExecutionEnvironment.fromSource(serverTimeZone.tableList(strArr).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverId("5401-5404").deserializer(build).scanNewlyAddedTableEnabled(true).build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source" + i);
            if (collectResultIterator == null) {
                collectResultIterator = addCollectSink(fromSource);
            } else {
                addCollectSink(fromSource);
            }
            JobClient executeAsync = streamExecutionEnvironment.executeAsync("Collect " + i);
            collectResultIterator.setJobClient(executeAsync);
            List asList = Arrays.asList("+I[103, user_3, Shanghai, 123567891234, customers_even_dist]", "+I[104, user_4, Shanghai, 123567891234, customers_even_dist]", "+I[101, user_1, Shanghai, 123567891234, customers_even_dist]", "+I[102, user_2, Shanghai, 123567891234, customers_even_dist]", "+I[107, user_7, Shanghai, 123567891234, customers_even_dist]", "+I[108, user_8, Shanghai, 123567891234, customers_even_dist]", "+I[105, user_5, Shanghai, 123567891234, customers_even_dist]", "+I[106, user_6, Shanghai, 123567891234, customers_even_dist]", "+I[109, user_9, Shanghai, 123567891234, customers_even_dist]", "+I[110, user_10, Shanghai, 123567891234, customers_even_dist]");
            List asList2 = Arrays.asList(String.format("+I[1011, user_12, Shanghai, 123567891234, %s]", str3), String.format("+I[1012, user_13, Shanghai, 123567891234, %s]", str3), String.format("+I[1009, user_10, Shanghai, 123567891234, %s]", str3), String.format("+I[1010, user_11, Shanghai, 123567891234, %s]", str3), String.format("+I[1015, user_16, Shanghai, 123567891234, %s]", str3), String.format("+I[1016, user_17, Shanghai, 123567891234, %s]", str3), String.format("+I[1013, user_14, Shanghai, 123567891234, %s]", str3), String.format("+I[118, user_7, Shanghai, 123567891234, %s]", str3), String.format("+I[1014, user_15, Shanghai, 123567891234, %s]", str3), String.format("+I[111, user_6, Shanghai, 123567891234, %s]", str3), String.format("+I[2000, user_21, Shanghai, 123567891234, %s]", str3), String.format("+I[109, user_4, Shanghai, 123567891234, %s]", str3), String.format("+I[110, user_5, Shanghai, 123567891234, %s]", str3), String.format("+I[103, user_3, Shanghai, 123567891234, %s]", str3), String.format("+I[101, user_1, Shanghai, 123567891234, %s]", str3), String.format("+I[102, user_2, Shanghai, 123567891234, %s]", str3), String.format("+I[123, user_9, Shanghai, 123567891234, %s]", str3), String.format("+I[1019, user_20, Shanghai, 123567891234, %s]", str3), String.format("+I[121, user_8, Shanghai, 123567891234, %s]", str3), String.format("+I[1017, user_18, Shanghai, 123567891234, %s]", str3), String.format("+I[1018, user_19, Shanghai, 123567891234, %s]", str3));
            List asList3 = Arrays.asList(String.format("-U[103, user_3, Shanghai, 123567891234, %s]", str3), String.format("+U[103, user_3, Update1, 123567891234, %s]", str3), String.format("-D[102, user_2, Shanghai, 123567891234, %s]", str3), String.format("+I[102, user_2, Insert1, 123567891234, %s]", str3), String.format("-U[103, user_3, Update1, 123567891234, %s]", str3), String.format("+U[103, user_3, Update2, 123567891234, %s]", str3));
            List list = i == 0 ? (List) Stream.concat(asList.stream(), asList2.stream()).collect(Collectors.toList()) : asList2;
            assertEqualsInAnyOrder(list, fetchRowData(collectResultIterator, list.size()));
            MySqlConnection connection = getConnection();
            try {
                connection.setAutoCommit(false);
                String str4 = this.customDatabase.getDatabaseName() + "." + str3;
                connection.execute(new String[]{"UPDATE " + str4 + " SET address = 'Update1' where id = 103", "DELETE FROM " + str4 + " where id = 102", "INSERT INTO " + str4 + " VALUES(102, 'user_2','Insert1','123567891234')", "UPDATE " + str4 + " SET address = 'Update2' where id = 103"});
                connection.commit();
                if (connection != null) {
                    connection.close();
                }
                assertEqualsInAnyOrder(asList3, fetchRowData(collectResultIterator, asList3.size()));
                str2 = triggerSavepointWithRetry(executeAsync, uri);
                executeAsync.cancel().get();
                i++;
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        temporaryFolder.delete();
    }

    protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> dataStream) {
        TypeSerializer createSerializer = dataStream.getType().createSerializer(dataStream.getExecutionConfig());
        String str = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory collectSinkOperatorFactory = new CollectSinkOperatorFactory(createSerializer, str);
        CollectSinkOperator operator = collectSinkOperatorFactory.getOperator();
        CollectStreamSink collectStreamSink = new CollectStreamSink(dataStream, collectSinkOperatorFactory);
        collectStreamSink.name("Data stream collect sink");
        dataStream.getExecutionEnvironment().addOperator(collectStreamSink.getTransformation());
        return new CollectResultIterator<>(operator.getOperatorIdFuture(), createSerializer, str, dataStream.getExecutionEnvironment().getCheckpointConfig());
    }

    private List<String> fetchRowData(Iterator<RowData> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next());
            i--;
        }
        return convertRowDataToRowString(arrayList);
    }

    private static List<String> convertRowDataToRowString(List<RowData> list) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("id", 0);
        linkedHashMap.put("name", 1);
        linkedHashMap.put("address", 2);
        linkedHashMap.put("phone_number", 3);
        linkedHashMap.put("_table_name", 4);
        return (List) list.stream().map(rowData -> {
            return RowUtils.createRowWithNamedPositions(rowData.getRowKind(), new Object[]{Long.valueOf(rowData.getLong(0)), rowData.getString(1), rowData.getString(2), rowData.getString(3), rowData.getString(4)}, linkedHashMap).toString();
        }).collect(Collectors.toList());
    }

    private void testRemoveTablesOneByOne(int i, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String... strArr) throws Exception {
        initialAddressTables(getConnection(), strArr);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            String str2 = str.split("_")[1];
            arrayList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str, str2, str2), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str, str2, str2), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str, str2, str2)));
        }
        StreamTableEnvironment create = StreamTableEnvironment.create(getStreamExecutionEnvironment(null, i));
        create.executeSql(getCreateTableStatement(new HashMap(), strArr));
        create.executeSql("CREATE TABLE sink ( table_name STRING, id BIGINT, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        JobClient jobClient = (JobClient) create.executeSql("insert into sink select * from address").getJobClient().get();
        if (failoverPhase == MySqlSourceTestBase.FailoverPhase.SNAPSHOT) {
            triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        waitForSinkSize("sink", arrayList.size());
        assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
        String triggerSavepointWithRetry = triggerSavepointWithRetry(jobClient, uri);
        jobClient.cancel().get();
        for (int i2 = 0; i2 < strArr.length - 1; i2++) {
            String[] strArr2 = (String[]) Arrays.asList(strArr).subList(i2 + 1, strArr.length).toArray(new String[0]);
            StreamTableEnvironment create2 = StreamTableEnvironment.create(getStreamExecutionEnvironment(triggerSavepointWithRetry, i));
            create2.executeSql(getCreateTableStatement(new HashMap(), strArr2));
            create2.executeSql("CREATE TABLE sink ( table_name STRING, id BIGINT, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            JobClient jobClient2 = (JobClient) create2.executeSql("insert into sink select * from address").getJobClient().get();
            waitForSinkSize("sink", arrayList.size());
            assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            ArrayList arrayList2 = new ArrayList();
            int length = strArr.length;
            for (int i3 = 0; i3 < length; i3++) {
                String str3 = strArr[i3];
                makeBinlogForAddressTable(getConnection(), str3, i2);
                if (i3 > i2) {
                    String str4 = str3.split("_")[1];
                    arrayList2.addAll(Arrays.asList(String.format("+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]", str3, Integer.valueOf(i2), str4, str4), String.format("+I[%s, %d, China, %s, %s West Town address 4]", str3, Long.valueOf(417022095255614380L + i2), str4, str4)));
                }
            }
            if (failoverPhase == MySqlSourceTestBase.FailoverPhase.BINLOG && TestValuesTableFactory.getRawResults("sink").size() > arrayList.size()) {
                triggerFailover(failoverType, jobClient2.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            arrayList.addAll(arrayList2);
            waitForSinkSize("sink", arrayList.size());
            assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            triggerSavepointWithRetry = triggerSavepointWithRetry(jobClient2, uri);
            jobClient2.cancel().get();
        }
    }

    private void testNewlyAddedTableOneByOne(int i, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, boolean z, String... strArr) throws Exception {
        testNewlyAddedTableOneByOne(i, new HashMap(), failoverType, failoverPhase, z, strArr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v54, types: [java.util.List] */
    private void testNewlyAddedTableOneByOne(int i, Map<String, String> map, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, boolean z, String... strArr) throws Exception {
        initialAddressTables(getConnection(), strArr);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        String str = null;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            String[] strArr2 = (String[]) Arrays.asList(strArr).subList(0, i2 + 1).toArray(new String[0]);
            String str2 = strArr[i2];
            if (z) {
                makeBinlogBeforeCaptureForAddressTable(getConnection(), str2);
            }
            StreamTableEnvironment create = StreamTableEnvironment.create(getStreamExecutionEnvironment(str, i));
            create.executeSql(getCreateTableStatement(map, strArr2));
            create.executeSql("CREATE TABLE sink ( table_name STRING, id BIGINT, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            JobClient jobClient = (JobClient) create.executeSql("insert into sink select * from address").getJobClient().get();
            String str3 = str2.split("_")[1];
            List asList = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str2, str3, str3), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str2, str3, str3));
            if (z) {
                asList = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str2, str3, str3), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str2, str3, str3), String.format("+I[%s, 417022095255614381, China, %s, %s West Town address 5]", str2, str3, str3));
            }
            if (failoverPhase == MySqlSourceTestBase.FailoverPhase.SNAPSHOT) {
                triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            arrayList.addAll(asList);
            waitForUpsertSinkSize("sink", arrayList.size());
            assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getResults("sink"));
            makeFirstPartBinlogForAddressTable(getConnection(), str2);
            if (failoverPhase == MySqlSourceTestBase.FailoverPhase.BINLOG) {
                triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            makeSecondPartBinlogForAddressTable(getConnection(), str2);
            arrayList = (List) arrayList.stream().filter(str4 -> {
                return !str4.contains(String.format("%s, 416874195632735147", str2));
            }).collect(Collectors.toList());
            arrayList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 417022095255614380, China, %s, %s West Town address 4]", str2, str3, str3)));
            waitForUpsertSinkSize("sink", arrayList.size());
            Thread.sleep(1000L);
            assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getResults("sink"));
            if (i2 != strArr.length - 1) {
                str = triggerSavepointWithRetry(jobClient, uri);
            }
            jobClient.cancel().get();
        }
    }

    private String getCreateTableStatement(Map<String, String> map, String... strArr) {
        Object[] objArr = new Object[8];
        objArr[0] = MYSQL_CONTAINER.getHost();
        objArr[1] = Integer.valueOf(MYSQL_CONTAINER.getDatabasePort());
        objArr[2] = this.customDatabase.getUsername();
        objArr[3] = this.customDatabase.getPassword();
        objArr[4] = this.customDatabase.getDatabaseName();
        objArr[5] = getTableNameRegex(strArr);
        objArr[6] = getServerId();
        objArr[7] = map.isEmpty() ? "" : "," + ((String) map.entrySet().stream().map(entry -> {
            return String.format("'%s'='%s'", entry.getKey(), entry.getValue());
        }).collect(Collectors.joining(",")));
        return String.format("CREATE TABLE address ( table_name STRING METADATA VIRTUAL, id BIGINT NOT NULL, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.chunk.size' = '2', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.newly-added-table.enabled' = 'true' %s)", objArr);
    }

    private StreamExecutionEnvironment getStreamExecutionEnvironment(String str, int i) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        if (str != null) {
            Field declaredField = Thread.currentThread().getContextClassLoader().loadClass("org.apache.flink.streaming.api.environment.StreamExecutionEnvironment").getDeclaredField("configuration");
            declaredField.setAccessible(true);
            ((Configuration) declaredField.get(executionEnvironment)).setString(SavepointConfigOptions.SAVEPOINT_PATH, str);
        }
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
        return executionEnvironment;
    }

    private String triggerSavepointWithRetry(JobClient jobClient, String str) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 600; i++) {
            try {
                return (String) jobClient.triggerSavepoint(str).get();
            } catch (Exception e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
                if (!findThrowable.isPresent() || !((CheckpointException) findThrowable.get()).getMessage().contains("Checkpoint triggering task")) {
                    throw e;
                }
                Thread.sleep(100L);
            }
        }
        return null;
    }

    private String getTableNameRegex(String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == 1 ? strArr[0] : String.format("(%s)", StringUtils.join(strArr, "|"));
    }

    private String getServerId() {
        int nextInt = new Random().nextInt(100) + 5400;
        return nextInt + "-" + (nextInt + 4);
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private void initialAddressTables(JdbcConnection jdbcConnection, String[] strArr) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            for (String str : strArr) {
                String str2 = this.customDatabase.getDatabaseName() + "." + str;
                String str3 = str.split("_")[1];
                jdbcConnection.execute(new String[]{"CREATE TABLE " + str2 + "(  id BIGINT UNSIGNED NOT NULL PRIMARY KEY,  country VARCHAR(255) NOT NULL,  city VARCHAR(255) NOT NULL,  detail_address VARCHAR(1024));"});
                jdbcConnection.execute(new String[]{String.format("INSERT INTO  %s VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),       (416927583791428523, 'China', '%s', '%s West Town address 2'),       (417022095255614379, 'China', '%s', '%s West Town address 3');", str2, str3, str3, str3, str3, str3, str3)});
            }
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private void makeFirstPartBinlogForAddressTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            jdbcConnection.execute(new String[]{String.format("UPDATE %s SET COUNTRY = 'CHINA' where id = 416874195632735147", this.customDatabase.getDatabaseName() + "." + str)});
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private void makeSecondPartBinlogForAddressTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            String str2 = this.customDatabase.getDatabaseName() + "." + str;
            String str3 = str.split("_")[1];
            jdbcConnection.execute(new String[]{String.format("INSERT INTO %s VALUES(417022095255614380, 'China','%s','%s West Town address 4')", str2, str3, str3)});
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private void makeBinlogBeforeCaptureForAddressTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            String str2 = this.customDatabase.getDatabaseName() + "." + str;
            String str3 = str.split("_")[1];
            jdbcConnection.execute(new String[]{String.format("INSERT INTO %s VALUES(417022095255614381, 'China','%s','%s West Town address 5')", str2, str3, str3)});
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private MySqlConnection getConnection() {
        HashMap hashMap = new HashMap();
        hashMap.put("database.hostname", MYSQL_CONTAINER.getHost());
        hashMap.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        hashMap.put("database.user", this.customDatabase.getUsername());
        hashMap.put("database.password", this.customDatabase.getPassword());
        hashMap.put("database.serverTimezone", ZoneId.of("UTC").toString());
        return DebeziumUtils.createMySqlConnection(io.debezium.config.Configuration.from(hashMap), new Properties());
    }

    private void makeBinlogForAddressTable(JdbcConnection jdbcConnection, String str, int i) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            String str2 = this.customDatabase.getDatabaseName() + "." + str;
            String str3 = str.split("_")[1];
            jdbcConnection.execute(new String[]{String.format("UPDATE %s SET COUNTRY = 'CHINA_%s' where id = 416874195632735147", str2, Integer.valueOf(i))});
            jdbcConnection.execute(new String[]{String.format("INSERT INTO %s VALUES(%d, 'China','%s','%s West Town address 4')", str2, Long.valueOf(417022095255614380L + i), str3, str3)});
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }
}
