/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.tidb.table;

import com.ververica.cdc.connectors.tidb.TiDBTestBase;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TiDBConnectorRegionITCase
extends TiDBTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TiDBConnectorRegionITCase.class);
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    @ClassRule
    public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(1);
    }

    @Test
    public void testRegionChange() throws Exception {
        this.initializeTidbTable("region_switch_test");
        String sourceDDL = String.format("CREATE TABLE tidb_source ( `id` INT NOT NULL, b INT, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'tidb-cdc', 'tikv.grpc.timeout_in_ms' = '20000', 'pd-addresses' = '%s', 'database-name' = '%s', 'table-name' = '%s')", PD.getContainerIpAddress() + ":" + PD.getMappedPort(2379), "region_switch_test", "t1");
        String sinkDDL = "CREATE TABLE sink ( `id` INT NOT NULL, b INT, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
        TiDBConnectorRegionITCase.waitForSinkSize("sink", 1);
        int count = 0;
        try (Connection connection = this.getJdbcConnection("region_switch_test");
             Statement statement = connection.createStatement();){
            for (int i = 0; i < 15; ++i) {
                statement.execute("INSERT INTO t1 SELECT NULL, FLOOR(RAND()*1000), RANDOM_BYTES(1024), RANDOM_BYTES(1024), RANDOM_BYTES(1024) FROM t1 a JOIN t1 b JOIN t1 c LIMIT 10000;");
            }
            ResultSet resultSet = statement.executeQuery("SHOW TABLE t1 REGIONS;");
            while (resultSet.next()) {
                String regionId = resultSet.getString(1);
                String leaderStoreId = resultSet.getString(2);
                String peerStoreIds = resultSet.getString(3);
                String regionState = resultSet.getString(4);
                String regionRows = resultSet.getString(5);
                String regionSize = resultSet.getString(6);
                String regionKeys = resultSet.getString(7);
                LOG.info("regionId: {}, leaderStoreId: {}, peerStoreIds: {}, regionState: {}, regionRows: {}, regionSize: {}, regionKeys: {}", new Object[]{regionId, leaderStoreId, peerStoreIds, regionState, regionRows, regionSize, regionKeys});
            }
            ResultSet resultSetCount = statement.executeQuery("select count(*) from t1;");
            resultSetCount.next();
            count = resultSetCount.getInt(1);
            LOG.info("count: {}", (Object)count);
        }
        TiDBConnectorRegionITCase.waitForSinkSize("sink", count);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private static void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
        while (TiDBConnectorRegionITCase.sinkSize(sinkName) < expectedSize) {
            Thread.sleep(100L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int sinkSize(String sinkName) {
        Class<TestValuesTableFactory> clazz = TestValuesTableFactory.class;
        synchronized (TestValuesTableFactory.class) {
            try {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return TestValuesTableFactory.getRawResults((String)sinkName).size();
            }
            catch (IllegalArgumentException e) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return 0;
            }
        }
    }
}

