package com.ververica.cdc.connectors.tidb.table;

import com.ververica.cdc.connectors.tidb.TiDBTestBase;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.class */
public class TiDBConnectorRegionITCase extends TiDBTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());
    private static final Logger LOG = LoggerFactory.getLogger(TiDBConnectorRegionITCase.class);

    @ClassRule
    public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(1);
    }

    @Test
    public void testRegionChange() throws Exception {
        initializeTidbTable("region_switch_test");
        this.tEnv.executeSql(String.format("CREATE TABLE tidb_source ( `id` INT NOT NULL, b INT, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'tidb-cdc', 'tikv.grpc.timeout_in_ms' = '20000', 'pd-addresses' = '%s', 'database-name' = '%s', 'table-name' = '%s')", PD.getContainerIpAddress() + ":" + PD.getMappedPort(TiDBTestBase.PD_PORT_ORIGIN), "region_switch_test", "t1"));
        this.tEnv.executeSql("CREATE TABLE sink ( `id` INT NOT NULL, b INT, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
        waitForSinkSize("sink", 1);
        Connection jdbcConnection = getJdbcConnection("region_switch_test");
        try {
            Statement createStatement = jdbcConnection.createStatement();
            for (int i = 0; i < 15; i++) {
                try {
                    createStatement.execute("INSERT INTO t1 SELECT NULL, FLOOR(RAND()*1000), RANDOM_BYTES(1024), RANDOM_BYTES(1024), RANDOM_BYTES(1024) FROM t1 a JOIN t1 b JOIN t1 c LIMIT 10000;");
                } finally {
                }
            }
            ResultSet executeQuery = createStatement.executeQuery("SHOW TABLE t1 REGIONS;");
            while (executeQuery.next()) {
                LOG.info("regionId: {}, leaderStoreId: {}, peerStoreIds: {}, regionState: {}, regionRows: {}, regionSize: {}, regionKeys: {}", new Object[]{executeQuery.getString(1), executeQuery.getString(2), executeQuery.getString(3), executeQuery.getString(4), executeQuery.getString(5), executeQuery.getString(6), executeQuery.getString(7)});
            }
            ResultSet executeQuery2 = createStatement.executeQuery("select count(*) from t1;");
            executeQuery2.next();
            int i2 = executeQuery2.getInt(1);
            LOG.info("count: {}", Integer.valueOf(i2));
            if (createStatement != null) {
                createStatement.close();
            }
            if (jdbcConnection != null) {
                jdbcConnection.close();
            }
            waitForSinkSize("sink", i2);
            ((JobClient) executeSql.getJobClient().get()).cancel().get();
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }
}
