package com.ververica.cdc.connectors.mysql.table;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.class */
public class MySqlTableSourceFactoryTest {
    private static final String MY_LOCALHOST = "localhost";
    private static final String MY_USERNAME = "flinkuser";
    private static final String MY_PASSWORD = "flinkpw";
    private static final String MY_DATABASE = "myDB";
    private static final String MY_TABLE = "myTable";
    private static final ResolvedSchema SCHEMA = new ResolvedSchema(Arrays.asList(Column.physical("aaa", DataTypes.INT().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), new ArrayList(), UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
    private static final ResolvedSchema SCHEMA_WITH_METADATA = new ResolvedSchema(Arrays.asList(Column.physical("id", DataTypes.BIGINT().notNull()), Column.physical("name", DataTypes.STRING()), Column.physical("count", DataTypes.DECIMAL(38, 18)), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), Column.metadata("database_name", DataTypes.STRING(), "database_name", true)), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
    private static final Properties PROPERTIES = new Properties();

    @Test
    public void testCommonProperties() {
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, (String) null, false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.initial(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(getAllOptions()));
    }

    @Test
    public void testEnableParallelReadSource() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.incremental.snapshot.enabled", "true");
        allOptions.put("server-id", "123-126");
        allOptions.put("scan.incremental.snapshot.chunk.size", "8000");
        allOptions.put("chunk-meta.group.size", "3000");
        allOptions.put("chunk-key.even-distribution.factor.upper-bound", "40.5");
        allOptions.put("chunk-key.even-distribution.factor.lower-bound", "0.01");
        allOptions.put("scan.snapshot.fetch.size", "100");
        allOptions.put("connect.timeout", "45s");
        allOptions.put("scan.incremental.snapshot.chunk.key-column", "testCol");
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, "123-126", true, 8000, 3000, 100, Duration.ofSeconds(45L), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), 40.5d, 0.01d, StartupOptions.initial(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), "testCol", ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testEnableParallelReadSourceWithSingleServerId() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.incremental.snapshot.enabled", "true");
        allOptions.put("server-id", "123");
        allOptions.put("scan.incremental.snapshot.chunk.size", "8000");
        allOptions.put("scan.snapshot.fetch.size", "100");
        allOptions.put("connect.timeout", "45s");
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, "123", true, 8000, ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), 100, Duration.ofSeconds(45L), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.initial(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testEnableParallelReadSourceLatestOffset() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.incremental.snapshot.enabled", "true");
        allOptions.put("server-id", "123-126");
        allOptions.put("scan.startup.mode", "latest-offset");
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, "123-126", ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue(), ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.latest(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testOptionalProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("port", "3307");
        allOptions.put("server-id", "4321");
        allOptions.put("server-time-zone", "Asia/Shanghai");
        allOptions.put("scan.newly-added-table.enabled", "true");
        allOptions.put("debezium.snapshot.mode", "never");
        allOptions.put("jdbc.properties.useSSL", "false");
        allOptions.put("heartbeat.interval", "15213ms");
        allOptions.put("scan.incremental.snapshot.chunk.key-column", "testCol");
        allOptions.put("scan.incremental.close-idle-reader.enabled", "true");
        allOptions.put("scan.incremental.snapshot.backfill.skip", "true");
        DynamicTableSource createTableSource = createTableSource(allOptions);
        Properties properties = new Properties();
        properties.put("snapshot.mode", "never");
        Properties properties2 = new Properties();
        properties2.setProperty("useSSL", "false");
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3307, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.of("Asia/Shanghai"), properties, "4321", false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.initial(), true, true, properties2, Duration.ofMillis(15213L), "testCol", true), createTableSource);
    }

    @Test
    public void testStartupFromSpecificOffset() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("port", "3307");
        allOptions.put("server-id", "4321");
        allOptions.put("scan.startup.mode", "specific-offset");
        allOptions.put("scan.startup.specific-offset.file", "mysql-bin.000003");
        allOptions.put("scan.startup.specific-offset.pos", String.valueOf(100203));
        createTableSource(allOptions);
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3307, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, "4321", false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.specificOffset("mysql-bin.000003", 100203L), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testStartupFromInitial() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.startup.mode", "initial");
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, (String) null, false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.initial(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testStartupFromEarliestOffset() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.startup.mode", "earliest-offset");
        createTableSource(allOptions);
        createTableSource(allOptions);
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, (String) null, false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.earliest(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testStartupFromSpecificTimestamp() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.startup.mode", "timestamp");
        allOptions.put("scan.startup.timestamp-millis", "0");
        createTableSource(allOptions);
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, (String) null, false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.timestamp(0L), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testStartupFromLatestOffset() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scan.startup.mode", "latest-offset");
        Assert.assertEquals(new MySqlTableSource(SCHEMA, 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, (String) null, false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.latest(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue()), createTableSource(allOptions));
    }

    @Test
    public void testMetadataColumns() {
        MySqlTableSource createTableSource = createTableSource(SCHEMA_WITH_METADATA, getAllOptions());
        createTableSource.applyReadableMetadata(Arrays.asList("op_ts", "database_name"), SCHEMA_WITH_METADATA.toSourceRowDataType());
        DynamicTableSource copy = createTableSource.copy();
        MySqlTableSource mySqlTableSource = new MySqlTableSource(ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), 3306, MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_USERNAME, MY_PASSWORD, ZoneId.systemDefault(), PROPERTIES, (String) null, false, ((Integer) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.defaultValue()).intValue(), (Duration) MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(), ((Integer) MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue()).intValue(), ((Integer) MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue()).intValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue()).doubleValue(), ((Double) MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue()).doubleValue(), StartupOptions.initial(), false, false, new Properties(), (Duration) MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(), (String) null, ((Boolean) MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue());
        mySqlTableSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
        mySqlTableSource.metadataKeys = Arrays.asList("op_ts", "database_name");
        Assert.assertEquals(mySqlTableSource, copy);
    }

    @Test
    public void testValidation() {
        try {
            Map<String, String> allOptions = getAllOptions();
            allOptions.put("port", "123b");
            createTableSource(allOptions);
            Assert.fail("exception expected");
        } catch (Throwable th) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "Could not parse value '123b' for key 'port'.").isPresent());
        }
        try {
            Map<String, String> allOptions2 = getAllOptions();
            allOptions2.put("server-id", "123b");
            createTableSource(allOptions2);
            Assert.fail("exception expected");
        } catch (Throwable th2) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th2, "The value of option 'server-id' is invalid: '123b'").isPresent());
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th2, "The server id 123b is not a valid numeric.").isPresent());
        }
        try {
            Map<String, String> allOptions3 = getAllOptions();
            allOptions3.put("scan.incremental.snapshot.enabled", "true");
            allOptions3.put("scan.incremental.snapshot.chunk.size", "1");
            createTableSource(allOptions3);
            Assert.fail("exception expected");
        } catch (Throwable th3) {
            Assert.assertThat(th3, FlinkMatchers.containsMessage("The value of option 'scan.incremental.snapshot.chunk.size' must larger than 1, but is 1"));
        }
        try {
            Map<String, String> allOptions4 = getAllOptions();
            allOptions4.put("scan.incremental.snapshot.enabled", "true");
            allOptions4.put("scan.snapshot.fetch.size", "1");
            createTableSource(allOptions4);
            Assert.fail("exception expected");
        } catch (Throwable th4) {
            Assert.assertThat(th4, FlinkMatchers.containsMessage("The value of option 'scan.snapshot.fetch.size' must larger than 1, but is 1"));
        }
        try {
            Map<String, String> allOptions5 = getAllOptions();
            allOptions5.put("scan.incremental.snapshot.enabled", "true");
            allOptions5.put("chunk-meta.group.size", "1");
            createTableSource(allOptions5);
            Assert.fail("exception expected");
        } catch (Throwable th5) {
            Assert.assertThat(th5, FlinkMatchers.containsMessage("The value of option 'chunk-meta.group.size' must larger than 1, but is 1"));
        }
        try {
            Map<String, String> allOptions6 = getAllOptions();
            allOptions6.put("scan.incremental.snapshot.enabled", "true");
            allOptions6.put("split-key.even-distribution.factor.upper-bound", "0.8");
            createTableSource(allOptions6);
            Assert.fail("exception expected");
        } catch (Throwable th6) {
            Assert.assertThat(th6, FlinkMatchers.containsMessage("The value of option 'chunk-key.even-distribution.factor.upper-bound' must larger than or equals 1.0, but is 0.8"));
        }
        try {
            Map<String, String> allOptions7 = getAllOptions();
            allOptions7.put("scan.incremental.snapshot.enabled", "true");
            allOptions7.put("connection.pool.size", "1");
            createTableSource(allOptions7);
            Assert.fail("exception expected");
        } catch (Throwable th7) {
            Assert.assertThat(th7, FlinkMatchers.containsMessage("The value of option 'connection.pool.size' must larger than 1, but is 1"));
        }
        try {
            Map<String, String> allOptions8 = getAllOptions();
            allOptions8.put("scan.incremental.snapshot.enabled", "true");
            allOptions8.put("connect.max-retries", "0");
            createTableSource(allOptions8);
            Assert.fail("exception expected");
        } catch (Throwable th8) {
            Assert.assertThat(th8, FlinkMatchers.containsMessage("The value of option 'connect.max-retries' must larger than 0, but is 0"));
        }
        for (ConfigOption configOption : new MySqlTableSourceFactory().requiredOptions()) {
            Map<String, String> allOptions9 = getAllOptions();
            allOptions9.remove(configOption.key());
            try {
                createTableSource(allOptions9);
                Assert.fail("exception expected");
            } catch (Throwable th9) {
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th9, "Missing required options are:\n\n" + configOption.key()).isPresent());
            }
        }
        try {
            Map<String, String> allOptions10 = getAllOptions();
            allOptions10.put("unknown", "abc");
            createTableSource(allOptions10);
            Assert.fail("exception expected");
        } catch (Throwable th10) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th10, "Unsupported options:\n\nunknown").isPresent());
        }
        try {
            Map<String, String> allOptions11 = getAllOptions();
            allOptions11.put("scan.startup.mode", "abc");
            createTableSource(allOptions11);
            Assert.fail("exception expected");
        } catch (Throwable th11) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th11, "Invalid value for option 'scan.startup.mode'. Supported values are [initial, latest-offset, earliest-offset, specific-offset, timestamp], but was: abc").isPresent());
        }
        try {
            getAllOptions().put("database-name", "*_invalid_db");
        } catch (Throwable th12) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th12, String.format("The database-name '%s' is not a valid regular expression", "*_invalid_db")).isPresent());
        }
        try {
            getAllOptions().put("table-name", "*_invalid_table");
        } catch (Throwable th13) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th13, String.format("The table-name '%s' is not a valid regular expression", "*_invalid_table")).isPresent());
        }
    }

    private Map<String, String> getAllOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "mysql-cdc");
        hashMap.put("hostname", MY_LOCALHOST);
        hashMap.put("database-name", MY_DATABASE);
        hashMap.put("table-name", MY_TABLE);
        hashMap.put("username", MY_USERNAME);
        hashMap.put("password", MY_PASSWORD);
        hashMap.put("scan.incremental.snapshot.enabled", String.valueOf(false));
        return hashMap;
    }

    private static DynamicTableSource createTableSource(ResolvedSchema resolvedSchema, Map<String, String> map) {
        return FactoryUtil.createTableSource((Catalog) null, ObjectIdentifier.of("default", "default", "t1"), new ResolvedCatalogTable(CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), "mock source", new ArrayList(), map), resolvedSchema), new Configuration(), MySqlTableSourceFactoryTest.class.getClassLoader(), false);
    }

    private static DynamicTableSource createTableSource(Map<String, String> map) {
        return createTableSource(SCHEMA, map);
    }
}
