package com.ververica.cdc.connectors.mongodb.table;

import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import com.ververica.cdc.connectors.utils.AssertUtils;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.class */
public class MongoDBTableFactoryTest {
    private static final String MY_HOSTS = "localhost:27017,localhost:27018";
    private static final String USER = "flinkuser";
    private static final String PASSWORD = "flinkpw";
    private static final String MY_DATABASE = "myDB";
    private static final String MY_TABLE = "myTable";
    private static final ResolvedSchema SCHEMA = new ResolvedSchema(Arrays.asList(Column.physical("_id", DataTypes.STRING().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Arrays.asList("_id")));
    private static final ResolvedSchema SCHEMA_WITH_METADATA = new ResolvedSchema(Arrays.asList(Column.physical("_id", DataTypes.STRING().notNull()), Column.physical("bbb", DataTypes.STRING().notNull()), Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3)), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), Column.metadata("_database_name", DataTypes.STRING(), "database_name", true)), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("_id")));
    private static final ZoneId LOCAL_TIME_ZONE = ZoneId.systemDefault();
    private static final int BATCH_SIZE_DEFAULT = ((Integer) MongoDBSourceOptions.BATCH_SIZE.defaultValue()).intValue();
    private static final int POLL_MAX_BATCH_SIZE_DEFAULT = ((Integer) MongoDBSourceOptions.POLL_MAX_BATCH_SIZE.defaultValue()).intValue();
    private static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = ((Integer) MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS.defaultValue()).intValue();
    private static final int HEARTBEAT_INTERVAL_MILLIS_DEFAULT = ((Integer) MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS.defaultValue()).intValue();
    private static final boolean SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT = ((Boolean) MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue()).booleanValue();
    private static final int SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT = ((Integer) MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue()).intValue();
    private static final int SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES_DEFAULT = ((Integer) MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES.defaultValue()).intValue();
    private static final int CHUNK_META_GROUP_SIZE_DEFAULT = ((Integer) SourceOptions.CHUNK_META_GROUP_SIZE.defaultValue()).intValue();
    private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT = ((Boolean) SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()).booleanValue();
    private static final boolean FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT = ((Boolean) MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE.defaultValue()).booleanValue();
    private static final boolean SCAN_NO_CURSOR_TIMEOUT_DEFAULT = ((Boolean) MongoDBSourceOptions.SCAN_NO_CURSOR_TIMEOUT.defaultValue()).booleanValue();
    private static final boolean SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT = ((Boolean) SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()).booleanValue();

    @Test
    public void testCommonProperties() {
        Assert.assertEquals(new MongoDBTableSource(SCHEMA, (String) MongoDBSourceOptions.SCHEME.defaultValue(), MY_HOSTS, "flinkuser", PASSWORD, MY_DATABASE, MY_TABLE, (String) null, StartupOptions.initial(), (Integer) null, Integer.valueOf(BATCH_SIZE_DEFAULT), Integer.valueOf(POLL_MAX_BATCH_SIZE_DEFAULT), Integer.valueOf(POLL_AWAIT_TIME_MILLIS_DEFAULT), Integer.valueOf(HEARTBEAT_INTERVAL_MILLIS_DEFAULT), LOCAL_TIME_ZONE, SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT, Integer.valueOf(CHUNK_META_GROUP_SIZE_DEFAULT), Integer.valueOf(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT), Integer.valueOf(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES_DEFAULT), SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT, FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT, SCAN_NO_CURSOR_TIMEOUT_DEFAULT, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT), createTableSource(SCHEMA, getAllOptions()));
    }

    @Test
    public void testOptionalProperties() {
        Map<String, String> allOptions = getAllOptions();
        allOptions.put("scheme", "mongodb+srv");
        allOptions.put("connection.options", "replicaSet=test&connectTimeoutMS=300000");
        allOptions.put("scan.startup.mode", "timestamp");
        allOptions.put("scan.startup.timestamp-millis", "1667232000000");
        allOptions.put("initial.snapshotting.queue.size", "100");
        allOptions.put("batch.size", "101");
        allOptions.put("poll.max.batch.size", "102");
        allOptions.put("poll.await.time.ms", "103");
        allOptions.put("heartbeat.interval.ms", "104");
        allOptions.put("scan.incremental.snapshot.enabled", "true");
        allOptions.put("chunk-meta.group.size", "1001");
        allOptions.put("scan.incremental.snapshot.chunk.size.mb", "10");
        allOptions.put("scan.incremental.snapshot.chunk.samples", "10");
        allOptions.put("scan.incremental.close-idle-reader.enabled", "true");
        allOptions.put("scan.incremental.snapshot.backfill.skip", "true");
        allOptions.put("scan.full-changelog", "true");
        allOptions.put("scan.cursor.no-timeout", "false");
        Assert.assertEquals(new MongoDBTableSource(SCHEMA, "mongodb+srv", MY_HOSTS, "flinkuser", PASSWORD, MY_DATABASE, MY_TABLE, "replicaSet=test&connectTimeoutMS=300000", StartupOptions.timestamp(1667232000000L), 100, 101, 102, 103, 104, LOCAL_TIME_ZONE, true, 1001, 10, 10, true, true, false, true), createTableSource(SCHEMA, allOptions));
    }

    @Test
    public void testMetadataColumns() {
        MongoDBTableSource createTableSource = createTableSource(SCHEMA_WITH_METADATA, getAllOptions());
        createTableSource.applyReadableMetadata(Arrays.asList("op_ts", "database_name"), SCHEMA_WITH_METADATA.toSourceRowDataType());
        DynamicTableSource copy = createTableSource.copy();
        MongoDBTableSource mongoDBTableSource = new MongoDBTableSource(ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), (String) MongoDBSourceOptions.SCHEME.defaultValue(), MY_HOSTS, "flinkuser", PASSWORD, MY_DATABASE, MY_TABLE, (String) null, StartupOptions.initial(), (Integer) null, Integer.valueOf(BATCH_SIZE_DEFAULT), Integer.valueOf(POLL_MAX_BATCH_SIZE_DEFAULT), Integer.valueOf(POLL_AWAIT_TIME_MILLIS_DEFAULT), Integer.valueOf(HEARTBEAT_INTERVAL_MILLIS_DEFAULT), LOCAL_TIME_ZONE, SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT, Integer.valueOf(CHUNK_META_GROUP_SIZE_DEFAULT), Integer.valueOf(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT), Integer.valueOf(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES_DEFAULT), SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT, FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT, SCAN_NO_CURSOR_TIMEOUT_DEFAULT, false);
        mongoDBTableSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
        mongoDBTableSource.metadataKeys = Arrays.asList("op_ts", "database_name");
        Assert.assertEquals(mongoDBTableSource, copy);
        AssertUtils.assertProducedTypeOfSourceFunction(createTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).createSourceFunction(), mongoDBTableSource.producedDataType);
    }

    @Test
    public void testValidation() {
        try {
            Map<String, String> allOptions = getAllOptions();
            allOptions.put("unknown", "abc");
            createTableSource(SCHEMA, allOptions);
            Assert.fail("exception expected");
        } catch (Throwable th) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "Unsupported options:\n\nunknown").isPresent());
        }
    }

    private Map<String, String> getAllOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "mongodb-cdc");
        hashMap.put("hosts", MY_HOSTS);
        hashMap.put("username", "flinkuser");
        hashMap.put("password", PASSWORD);
        hashMap.put("database", MY_DATABASE);
        hashMap.put("collection", MY_TABLE);
        return hashMap;
    }

    private static DynamicTableSource createTableSource(ResolvedSchema resolvedSchema, Map<String, String> map) {
        return FactoryUtil.createTableSource((Catalog) null, ObjectIdentifier.of("default", "default", "t1"), new ResolvedCatalogTable(CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), "mock source", new ArrayList(), map), resolvedSchema), new Configuration(), MongoDBTableFactoryTest.class.getClassLoader(), false);
    }
}
