package com.ververica.cdc.connectors.mysql.table;

import com.ververica.cdc.connectors.mysql.MySqlValidatorTest;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import java.io.File;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

@RunWith(Parameterized.class)
/* loaded from: input_file:com/ververica/cdc/connectors/mysql/table/MySqlTimezoneITCase.class */
public class MySqlTimezoneITCase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlTimezoneITCase.class);
    private static TemporaryFolder tempFolder;
    private static File resourceFolder;
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());

    @Parameterized.Parameter
    public Boolean incrementalSnapshot;

    @Parameterized.Parameters(name = "incrementalSnapshot: {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Before
    public void setup() throws Exception {
        resourceFolder = Paths.get(((URL) Objects.requireNonNull(MySqlValidatorTest.class.getClassLoader().getResource("."))).toURI()).toFile();
        tempFolder = new TemporaryFolder(resourceFolder);
        tempFolder.create();
        if (!this.incrementalSnapshot.booleanValue()) {
            this.env.setParallelism(1);
        } else {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        }
    }

    @Test
    public void testMySqlServerInBerlin() throws Exception {
        testTemporalTypesWithMySqlServerTimezone("Europe/Berlin");
    }

    @Test
    public void testMySqlServerInShanghai() throws Exception {
        testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai");
    }

    private void testTemporalTypesWithMySqlServerTimezone(String str) throws Exception {
        MySqlContainer withLogConsumer = new MySqlContainer().withConfigurationOverride(buildMySqlConfigWithTimezone(str)).withSetupSQL("docker/setup.sql").m21withDatabaseName("flink-test").m23withUsername("flinkuser").m22withPassword("flinkpw").withLogConsumer(new Slf4jLogConsumer(LOG));
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(withLogConsumer)).join();
        LOG.info("Containers are started.");
        UniqueDatabase uniqueDatabase = new UniqueDatabase(withLogConsumer, "column_type_test", "mysqluser", "mysqlpw");
        uniqueDatabase.createAndInitialize();
        this.tEnv.executeSql(String.format("CREATE TABLE full_types (\n    `id` INT NOT NULL,\n    tiny_c TINYINT,\n    tiny_un_c SMALLINT ,\n    small_c SMALLINT,\n    small_un_c INT,\n    int_c INT ,\n    int_un_c BIGINT,\n    int11_c BIGINT,\n    big_c BIGINT,\n    varchar_c STRING,\n    char_c STRING,\n    float_c FLOAT,\n    double_c DOUBLE,\n    decimal_c DECIMAL(8, 4),\n    numeric_c DECIMAL(6, 0),\n    boolean_c BOOLEAN,\n    date_c DATE,\n    time_c TIME(0),\n    datetime3_c TIMESTAMP(3),\n    datetime6_c TIMESTAMP(6),\n    timestamp_c TIMESTAMP(0),\n    file_uuid BYTES,\n    primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s', 'server-time-zone'='%s')", withLogConsumer.getHost(), Integer.valueOf(withLogConsumer.getDatabasePort()), uniqueDatabase.getUsername(), uniqueDatabase.getPassword(), uniqueDatabase.getDatabaseName(), "full_types", this.incrementalSnapshot, getServerId(), Integer.valueOf(getSplitSize()), str));
        TableResult executeSql = this.tEnv.executeSql("SELECT date_c, time_c, datetime3_c, datetime6_c, timestamp_c FROM full_types");
        CloseableIterator collect = executeSql.collect();
        String[] strArr = {"+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]"};
        MySqlSourceTestBase.assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRows(collect, strArr.length));
        Connection jdbcConnection = uniqueDatabase.getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                String[] strArr2 = {"-U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]", "+U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22]"};
                MySqlSourceTestBase.assertEqualsInOrder(Arrays.asList(strArr2), fetchRows(collect, strArr2.length));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
                withLogConsumer.stop();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private String getServerId() {
        int nextInt = new Random().nextInt(100) + 5400;
        return this.incrementalSnapshot.booleanValue() ? nextInt + "-" + (nextInt + this.env.getParallelism()) : String.valueOf(nextInt);
    }

    private int getSplitSize() {
        return this.incrementalSnapshot.booleanValue() ? 4 : 0;
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    private String buildMySqlConfigWithTimezone(String str) {
        try {
            Path createFile = Files.createFile(Paths.get(tempFolder.newFolder(String.valueOf(UUID.randomUUID())).getPath(), "my.cnf"), new FileAttribute[0]);
            Files.write(createFile, Collections.singleton("[mysqld]\nbinlog_format = row\nlog_bin = mysql-bin\nserver-id = 223344\nbinlog_row_image = FULL\n" + ("default-time_zone = '" + str + "'\n")), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
            return Paths.get(resourceFolder.getAbsolutePath(), new String[0]).relativize(createFile).toString();
        } catch (Exception e) {
            throw new RuntimeException("Failed to create my.cnf file.", e);
        }
    }
}
