package com.ververica.cdc.connectors.mongodb.table;

import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.class */
public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
    private static final String IDENTIFIER = "mongodb-cdc";
    private static final String DOCUMENT_ID_FIELD = "_id";
    private static final ConfigOption<String> HOSTS = ConfigOptions.key("hosts").stringType().noDefaultValue().withDescription("The comma-separated list of hostname and port pairs of the MongoDB servers. eg. localhost:27017,localhost:27018");
    private static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.");
    private static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.");
    private static final ConfigOption<String> DATABASE = ConfigOptions.key("database").stringType().noDefaultValue().withDescription("Name of the database to watch for changes.The database also supports regular expression to monitor multiple databases matches the regular expression.e.g. db[0-9] .");
    private static final ConfigOption<String> COLLECTION = ConfigOptions.key("collection").stringType().noDefaultValue().withDescription("Name of the collection in the database to watch for changes.The collection also supports regular expression to monitor multiple collections matches fully-qualified collection identifiers.e.g. db0\\.coll[0-9] .");
    private static final ConfigOption<String> CONNECTION_OPTIONS = ConfigOptions.key("connection.options").stringType().noDefaultValue().withDescription("The ampersand-separated MongoDB connection options. eg. replicaSet=test&connectTimeoutMS=300000");
    private static final ConfigOption<String> ERRORS_TOLERANCE = ConfigOptions.key("errors.tolerance").stringType().defaultValue(MongoDBSource.ERROR_TOLERANCE_NONE).withDescription("Whether to continue processing messages if an error is encountered. When set to none, the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to all, the connector silently ignores any bad messages.Accepted Values: 'none' or 'all'. Default 'none'.");
    private static final ConfigOption<Boolean> ERRORS_LOG_ENABLE = ConfigOptions.key("errors.log.enable").booleanType().defaultValue(Boolean.TRUE).withDescription("Whether details of failed operations should be written to the log file. When set to true, both errors that are tolerated (determined by the errors.tolerance setting) and not tolerated are written. When set to false, errors that are tolerated are omitted.");
    private static final ConfigOption<Boolean> COPY_EXISTING = ConfigOptions.key(MongoSourceConfig.COPY_EXISTING_CONFIG).booleanType().defaultValue(Boolean.TRUE).withDescription("Copy existing data from source collections and convert them to Change Stream events on their respective topics. Any changes to the data that occur during the copy process are applied once the copy is completed.");
    private static final ConfigOption<String> COPY_EXISTING_PIPELINE = ConfigOptions.key(MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG).stringType().noDefaultValue().withDescription("An array of JSON objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient.");
    private static final ConfigOption<Integer> COPY_EXISTING_MAX_THREADS = ConfigOptions.key(MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG).intType().noDefaultValue().withDescription("The number of threads to use when performing the data copy. Defaults to the number of processors.");
    private static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE = ConfigOptions.key(MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG).intType().noDefaultValue().withDescription("The max size of the queue to use when copying data. Defaults to 16000.");
    private static final ConfigOption<Integer> POLL_MAX_BATCH_SIZE = ConfigOptions.key(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG).intType().defaultValue(1000).withDescription("Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector. Defaults to 1000.");
    private static final ConfigOption<Integer> POLL_AWAIT_TIME_MILLIS = ConfigOptions.key(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG).intType().defaultValue(Integer.valueOf(MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT)).withDescription("The amount of time to wait before checking for new results on the change stream.Defaults: 1500.");
    private static final ConfigOption<Integer> HEARTBEAT_INTERVAL_MILLIS = ConfigOptions.key("heartbeat.interval.ms").intType().noDefaultValue().withDescription("The length of time in milliseconds between sending heartbeat messages.Heartbeat messages contain the post batch resume token and are sent when no source records have been published in the specified interval. This improves the resumability of the connector for low volume namespaces. Use 0 to disable. Defaults to 0.");

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        createTableFactoryHelper.validate();
        ReadableConfig options = createTableFactoryHelper.getOptions();
        String str = (String) options.get(HOSTS);
        String str2 = (String) options.getOptional(CONNECTION_OPTIONS).orElse(null);
        String str3 = (String) options.getOptional(USERNAME).orElse(null);
        String str4 = (String) options.getOptional(PASSWORD).orElse(null);
        String str5 = (String) options.getOptional(DATABASE).orElse(null);
        String str6 = (String) options.getOptional(COLLECTION).orElse(null);
        String str7 = (String) options.get(ERRORS_TOLERANCE);
        Boolean bool = (Boolean) options.get(ERRORS_LOG_ENABLE);
        Integer num = (Integer) options.get(POLL_MAX_BATCH_SIZE);
        Integer num2 = (Integer) options.get(POLL_AWAIT_TIME_MILLIS);
        Integer num3 = (Integer) options.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
        Boolean bool2 = (Boolean) options.get(COPY_EXISTING);
        String str8 = (String) options.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
        Integer num4 = (Integer) options.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
        Integer num5 = (Integer) options.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
        String str9 = (String) context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
        ZoneId systemDefault = ((String) TableConfigOptions.LOCAL_TIME_ZONE.defaultValue()).equals(str9) ? ZoneId.systemDefault() : ZoneId.of(str9);
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        Preconditions.checkArgument(resolvedSchema.getPrimaryKey().isPresent(), "Primary key must be present");
        checkPrimaryKey((UniqueConstraint) resolvedSchema.getPrimaryKey().get(), "Primary key must be _id field");
        return new MongoDBTableSource(resolvedSchema, str, str3, str4, str5, str6, str2, str7, bool, bool2, str8, num4, num5, num, num2, num3, systemDefault);
    }

    private void checkPrimaryKey(UniqueConstraint uniqueConstraint, String str) {
        Preconditions.checkArgument(uniqueConstraint.getColumns().size() == 1 && uniqueConstraint.getColumns().contains("_id"), str);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(HOSTS);
        return hashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(USERNAME);
        hashSet.add(PASSWORD);
        hashSet.add(CONNECTION_OPTIONS);
        hashSet.add(DATABASE);
        hashSet.add(COLLECTION);
        hashSet.add(ERRORS_TOLERANCE);
        hashSet.add(ERRORS_LOG_ENABLE);
        hashSet.add(COPY_EXISTING);
        hashSet.add(COPY_EXISTING_PIPELINE);
        hashSet.add(COPY_EXISTING_MAX_THREADS);
        hashSet.add(COPY_EXISTING_QUEUE_SIZE);
        hashSet.add(POLL_MAX_BATCH_SIZE);
        hashSet.add(POLL_AWAIT_TIME_MILLIS);
        hashSet.add(HEARTBEAT_INTERVAL_MILLIS);
        return hashSet;
    }
}
