/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator;

import io.confluent.connect.replicator.BasicAuthCredentialsType;
import io.confluent.connect.replicator.KafkaConfigs;
import io.confluent.connect.replicator.schemas.DefaultSubjectTranslator;
import io.confluent.connect.replicator.schemas.SubjectTranslator;
import io.confluent.connect.replicator.util.ByteArrayConverter;
import io.confluent.connect.replicator.util.EnumRecommender;
import io.confluent.connect.replicator.util.RegexValidator;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;

public class ReplicatorSourceConnectorConfig
extends AbstractConfig {
    public static final String LICENSE_CONFIG = "confluent.license";
    public static final String SRC_KEY_CONVERTER_CONFIG = "src.key.converter";
    public static final String SRC_VALUE_CONVERTER_CONFIG = "src.value.converter";
    public static final String SRC_HEADER_CONVERTER_CONFIG = "src.header.converter";
    public static final String DEST_KEY_CONVERTER_CONFIG = "key.converter";
    public static final String DEST_VALUE_CONVERTER_CONFIG = "value.converter";
    public static final String DEST_HEADER_CONVERTER_CONFIG = "header.converter";
    public static final String TOPIC_WHITELIST_CONFIG = "topic.whitelist";
    public static final String TOPIC_BLACKLIST_CONFIG = "topic.blacklist";
    public static final String TOPIC_REGEX_CONFIG = "topic.regex";
    public static final String TOPIC_RENAME_FORMAT_CONFIG = "topic.rename.format";
    public static final String TOPIC_POLL_INTERVAL_MS_CONFIG = "topic.poll.interval.ms";
    public static final String CONSUMER_POLL_TIMEOUT_INTERVAL_MS_CONFIG = "consumer.poll.timeout.interval.ms";
    public static final String CONSUMER_POLL_TIMEOUT_INTERVAL_MS_DOC = "This config is used to avoid indefinite poll() to the source cluster. The poll() returns null records if there is no new data.";
    public static final int CONSUMER_POLL_TIMEOUT_INTERVAL_MS_DEFAULT = (int)TimeUnit.SECONDS.toMillis(5L);
    public static final String DST_TOPIC_REPLICATION_FACTOR = "dest.topic.replication.factor";
    public static final String TOPIC_CREATE_BACKOFF_MS_CONFIG = "topic.create.backoff.ms";
    public static final String TOPIC_TIMESTAMP_TYPE_CONFIG = "topic.timestamp.type";
    public static final String TOPIC_AUTO_CREATE_CONFIG = "topic.auto.create";
    public static final String TOPIC_CONFIG_SYNC_CONFIG = "topic.config.sync";
    public static final String TOPIC_CONFIG_SYNC_INTERVAL_MS_CONFIG = "topic.config.sync.interval.ms";
    public static final String TOPIC_PRESERVE_PARTITIONS_CONFIG = "topic.preserve.partitions";
    public static final String CONFLUENT_TOPIC_CONFIG = "confluent.topic";
    public static final String OFFSET_START = "offset.start";
    public static final String OFFSET_TOPIC_COMMIT_CONFIG = "offset.topic.commit";
    public static final String OFFSET_TOPIC_COMMIT_BATCH_PERIOD_MS_CONFIG = "offset.topic.commit.batch.period.ms";
    public static final String OFFSET_TRANSLATOR_TASKS_MAX_CONFIG = "offset.translator.tasks.max";
    public static final String OFFSET_TRANSLATOR_TASKS_SEPARATE_CONFIG = "offset.translator.tasks.separate";
    public static final String OFFSET_TRANSLATOR_BATCH_PERIOD_MS_CONFIG = "offset.translator.batch.period.ms";
    public static final String OFFSET_TRANSLATOR_BATCH_SIZE_CONFIG = "offset.translator.batch.size";
    public static final String OFFSET_TIMESTAMPS_COMMIT_CONFIG = "offset.timestamps.commit";
    public static final String PROVENANCE_HEADER_ENABLE_CONFIG = "provenance.header.enable";
    public static final String PROVENANCE_HEADER_FILTER_OVERRIDES_CONFIG = "provenance.header.filter.overrides";
    public static final String SCHEMA_REGISTRY_TOPIC_CONFIG = "schema.registry.topic";
    public static final String SCHEMA_REGISTRY_TOPIC_DOC = "The topic that acts as the durable log for the schema registry.";
    public static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
    public static final String SCHEMA_REGISTRY_URL_DOC = "Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas.";
    public static final String MAX_SCHEMAS_PER_SUBJECT_CONFIG = "schema.registry.max.schemas.per.subject";
    public static final int MAX_SCHEMAS_PER_SUBJECT_DEFAULT = 1000;
    public static final String MAX_SCHEMAS_PER_SUBJECT_DOC = "Maximum number of schemas to cache locally.";
    public static final String BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG = "schema.registry.client.basic.auth.credentials.source";
    public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT = "URL";
    public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DOC = "Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT";
    public static final String SCHEMA_REGISTRY_USER_INFO_CONFIG = "schema.registry.client.basic.auth.user.info";
    public static final String SCHEMA_REGISTRY_USER_INFO_DEFAULT = "";
    public static final String SCHEMA_REGISTRY_USER_INFO_DOC = "Specify the user info for Basic Auth in the form of {username}:{password}";
    public static final String SCHEMA_SUBJECT_TRANSLATOR_PREFIX = "schema.subject.translator.";
    public static final String SCHEMA_SUBJECT_TRANSLATOR_CLASS_CONFIG = "schema.subject.translator.class";
    public static final String SCHEMA_SUBJECT_TRANSLATOR_CLASS_DOC = "Translator for the schema subject, or null if no translation is to be performed. Properties with the prefix 'schema.subject.translator.' will be passed to the configure() method of the translator.";
    public static final String FILTER_OVERRIDE_PATTERN = "([^,]+),([^,]+),([^-]*)-([^;]*);?";
    public static final Pattern FILTER_OVERRIDE_PATTERNS = Pattern.compile("(([^,]+),([^,]+),([^-]*)-([^;]*);?)*");
    public static final ConfigDef CONFIG_DEF = ReplicatorSourceConnectorConfig.baseConfigDef();
    private final String name;
    private static final String REPLICATOR_METRIC_GROUP = "confluent-replicator";
    private static final String REPLICATOR_METRIC_TYPE = "replicator";
    private static final String REPLICATOR_METRIC_NAME = "Replicator";

    protected static ConfigDef baseConfigDef() {
        ConfigDef configDef = new ConfigDef();
        String group = "Confluent Platform";
        int orderInGroup = 0;
        configDef.define(LICENSE_CONFIG, ConfigDef.Type.STRING, (Object)SCHEMA_REGISTRY_USER_INFO_DEFAULT, ConfigDef.Importance.HIGH, "Confluent will issue a license key to each subscriber. The license key will be a short snippet of text that you can copy and paste. Without the license key, you can use the Replicator for a 30-day trial period. If you are a subscriber, please contact Confluent Support for more information.", "Confluent Platform", ++orderInGroup, ConfigDef.Width.NONE, "License");
        group = "Source Topics";
        orderInGroup = 0;
        configDef.define(TOPIC_REGEX_CONFIG, ConfigDef.Type.STRING, null, (ConfigDef.Validator)new RegexValidator(), ConfigDef.Importance.HIGH, "Regex of topics to replicate to the destination cluster.", "Source Topics", ++orderInGroup, ConfigDef.Width.NONE, "Regex").define(TOPIC_WHITELIST_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.HIGH, "Whitelist of topics to be replicated.", "Source Topics", ++orderInGroup, ConfigDef.Width.LONG, "Whitelist").define(TOPIC_BLACKLIST_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.HIGH, "Topics to exclude from replication.", "Source Topics", ++orderInGroup, ConfigDef.Width.LONG, "Blacklist").define(TOPIC_POLL_INTERVAL_MS_CONFIG, ConfigDef.Type.INT, (Object)120000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "How often to poll the source cluster for new topics matching `topic.whitelist` or `topic.regex`.", "Source Topics", ++orderInGroup, ConfigDef.Width.NONE, "Topic Config Sync Interval (ms)").define(CONSUMER_POLL_TIMEOUT_INTERVAL_MS_CONFIG, ConfigDef.Type.INT, (Object)CONSUMER_POLL_TIMEOUT_INTERVAL_MS_DEFAULT, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)1000), ConfigDef.Importance.LOW, CONSUMER_POLL_TIMEOUT_INTERVAL_MS_DOC, "Source Topics", ++orderInGroup, ConfigDef.Width.NONE, "Consumer poll timeout Interval (ms)");
        group = "Source Data Conversion";
        orderInGroup = 0;
        configDef.define(SRC_KEY_CONVERTER_CONFIG, ConfigDef.Type.CLASS, (Object)ByteArrayConverter.class.getName(), ConfigDef.Importance.LOW, "Converter for the key field of messages retrieved from the source cluster.", "Source Data Conversion", ++orderInGroup, ConfigDef.Width.LONG, "Source Key Converter").define(SRC_VALUE_CONVERTER_CONFIG, ConfigDef.Type.CLASS, (Object)ByteArrayConverter.class.getName(), ConfigDef.Importance.LOW, "Converter for the value field of messages retrieved from the source cluster.", "Source Data Conversion", ++orderInGroup, ConfigDef.Width.LONG, "Source Value Converter").define(SRC_HEADER_CONVERTER_CONFIG, ConfigDef.Type.CLASS, (Object)ByteArrayConverter.class.getName(), ConfigDef.Importance.LOW, "HeaderConverter class used to convert serialized Kafka headers to Kafka Connect headers. Default value is ByteArrayConverter, which simply passes the input bytes into the destination record.", "Source Data Conversion", ++orderInGroup, ConfigDef.Width.LONG, "Source Header Converter");
        group = "Destination Data Conversion";
        orderInGroup = 0;
        configDef.define(DEST_KEY_CONVERTER_CONFIG, ConfigDef.Type.CLASS, (Object)ByteArrayConverter.class.getName(), ConfigDef.Importance.LOW, "Converter for the key field of messages going to the destination cluster.", "Destination Data Conversion", ++orderInGroup, ConfigDef.Width.LONG, "Destination Key Converter").define(DEST_VALUE_CONVERTER_CONFIG, ConfigDef.Type.CLASS, (Object)ByteArrayConverter.class.getName(), ConfigDef.Importance.LOW, "Converter for the value field of messages going to the destination cluster.", "Destination Data Conversion", ++orderInGroup, ConfigDef.Width.LONG, "Destination Value Converter").define(DEST_HEADER_CONVERTER_CONFIG, ConfigDef.Type.CLASS, (Object)ByteArrayConverter.class.getName(), ConfigDef.Importance.LOW, "HeaderConverter class used to convert serialized Kafka headers to Kafka Connect headers. Default value is ByteArrayConverter, which simply passes the input bytes into the destination record.", "Destination Data Conversion", ++orderInGroup, ConfigDef.Width.LONG, "Destination Header Converter");
        KafkaConfigs.addDefinitions(configDef);
        group = "Destination Topics";
        orderInGroup = 0;
        configDef.define(TOPIC_RENAME_FORMAT_CONFIG, ConfigDef.Type.STRING, (Object)"${topic}", ConfigDef.Importance.HIGH, "A format string for the topic name in the destination cluster, which may contain '${topic}' as a placeholder for the originating topic name. For example, ``dc_${topic}`` for the topic 'orders' will map to the destination topic name 'dc_orders'.\nBe careful of the potential for topic name collisions when configuring replicators from multiple source clusters. We typically recommend that each cluster be given a distinct prefix or suffix (as in the example above).", "Destination Topics", ++orderInGroup, ConfigDef.Width.NONE, "Rename Format").define(TOPIC_AUTO_CREATE_CONFIG, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.LOW, "Whether to automatically create topics in the destination cluster if required.", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Auto-create").define(TOPIC_PRESERVE_PARTITIONS_CONFIG, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.LOW, "Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster.", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Preserve Partitions").define(TOPIC_CREATE_BACKOFF_MS_CONFIG, ConfigDef.Type.INT, (Object)120000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "Time to wait before retrying auto topic creation or expansion.", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Creation Backoff (ms)", (ConfigDef.Recommender)new BooleanParentRecommender(TOPIC_AUTO_CREATE_CONFIG)).define(TOPIC_CONFIG_SYNC_CONFIG, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.LOW, "Whether to periodically sync topic configuration to the destination cluster.", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Enable Topic Config Sync").define(TOPIC_CONFIG_SYNC_INTERVAL_MS_CONFIG, ConfigDef.Type.INT, (Object)120000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "How often to check for configuration changes when ``topic.config.sync`` is enabled.", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Topic Config Sync Interval (ms)", (ConfigDef.Recommender)new BooleanParentRecommender(TOPIC_CONFIG_SYNC_CONFIG)).define(TOPIC_TIMESTAMP_TYPE_CONFIG, ConfigDef.Type.STRING, (Object)TimestampType.CREATE_TIME.toString(), (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{TimestampType.CREATE_TIME.toString(), TimestampType.LOG_APPEND_TIME.toString()}), ConfigDef.Importance.LOW, "The timestamp type for the topics in the destination cluster.", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Timestamp Type", (ConfigDef.Recommender)new BooleanParentRecommender(TOPIC_AUTO_CREATE_CONFIG, TOPIC_CONFIG_SYNC_CONFIG)).define(CONFLUENT_TOPIC_CONFIG, ConfigDef.Type.STRING, (Object)"_confluent-command", ConfigDef.Importance.LOW, "Topic used for Confluent Platform configuration, including licensing information.", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Confluent Topic").define(PROVENANCE_HEADER_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, (Object)false, ConfigDef.Importance.MEDIUM, "Whether to enable the use of provenance headers during replication. If true, Replicator will not replicate messages with a provenance header and will add a provenance header to messages that it does replicate.", "Destination Topics", ++orderInGroup, ConfigDef.Width.NONE, "Enable Provenance Headers").define(PROVENANCE_HEADER_FILTER_OVERRIDES_CONFIG, ConfigDef.Type.STRING, (Object)SCHEMA_REGISTRY_USER_INFO_DEFAULT, (ConfigDef.Validator)new FIlterOverrideValidator(), ConfigDef.Importance.LOW, "Filter overrides of the form <clusterId>,<topic>,<startTimeMs>-<endTimeMs>. These overrides are matched against the cluster ID, topic, and timestamp in the provenance header of records that would normally be filtered out during replication. If a match occurs, the filter is ignored and the record is replicated. The start time is inclusive and the end time is exclusive. Multiple overrides must be separated by semicolons.", "Destination Topics", ++orderInGroup, ConfigDef.Width.NONE, "Provenance Header Filter Overrides").define(DST_TOPIC_REPLICATION_FACTOR, ConfigDef.Type.INT, (Object)0, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "Sets the replication factor for the topic at the destination cluster", "Destination Topics", ++orderInGroup, ConfigDef.Width.LONG, "Destination Topic Replication Factor");
        group = "Offset Management";
        orderInGroup = 0;
        configDef.define(OFFSET_START, ConfigDef.Type.STRING, (Object)OffsetStart.CONNECT.name().toLowerCase(Locale.ROOT), (ConfigDef.Validator)EnumRecommender.in(OffsetStart.values()), ConfigDef.Importance.MEDIUM, "Specify the preference for determining where to start replication. Use ``connect`` to prefer the Connect offset if it exists, and the consumer offset otherwise. Use ``consumer`` to prefer the consumer offset if it exists, and the Connect offset otherwise. If neither the Connect offset nor the consumer offset exist, the starting offset will be the beginning of the topic.", "Offset Management", ++orderInGroup, ConfigDef.Width.NONE, "Starting Offset", (ConfigDef.Recommender)EnumRecommender.in(OffsetStart.values())).define(OFFSET_TOPIC_COMMIT_CONFIG, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.MEDIUM, "Whether to commit Replicator's consumer offsets to the source Kafka cluster after the messages have been written to the destination cluster.  These consumer offsets can be used to easily track the lag of Replicator.", "Offset Management", ++orderInGroup, ConfigDef.Width.NONE, "Enable Offset Topic Commit").define(OFFSET_TOPIC_COMMIT_BATCH_PERIOD_MS_CONFIG, ConfigDef.Type.INT, (Object)60000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)-1), ConfigDef.Importance.MEDIUM, "The period in milliseconds during which Replicator flushes consumer offsets to source Kafka cluster after the messages have been written to the destination cluster. If -1 is provided, Replicator uses the Connect framework commit offset flush period.", "Offset Management", ++orderInGroup, ConfigDef.Width.NONE, "Batch Period for Offset Topic Commit");
        group = "Offset Translation";
        orderInGroup = 0;
        configDef.define(OFFSET_TRANSLATOR_TASKS_MAX_CONFIG, ConfigDef.Type.INT, (Object)-1, ConfigDef.Importance.MEDIUM, "The maximum number of Replicator tasks that will perform offset translation.  If -1 (the default), all tasks will perform offset translation.", "Offset Translation", ++orderInGroup, ConfigDef.Width.NONE, "Offset Translator Maximum Tasks").define(OFFSET_TRANSLATOR_TASKS_SEPARATE_CONFIG, ConfigDef.Type.BOOLEAN, (Object)false, ConfigDef.Importance.MEDIUM, "Whether to translate offsets in separate tasks from those performing topic replication.", "Offset Translation", ++orderInGroup, ConfigDef.Width.NONE, "Separate Tasks for Offset Translator").define(OFFSET_TRANSLATOR_BATCH_PERIOD_MS_CONFIG, ConfigDef.Type.INT, (Object)60000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, "The period in milliseconds during which offset translation requests will be batched.", "Offset Translation", ++orderInGroup, ConfigDef.Width.NONE, "Batch Period for Offset Translator").define(OFFSET_TRANSLATOR_BATCH_SIZE_CONFIG, ConfigDef.Type.INT, (Object)Integer.MAX_VALUE, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, "The maximum size of a batch for offset translation requests.", "Offset Translation", ++orderInGroup, ConfigDef.Width.NONE, "Batch Size for Offset Translator").define(OFFSET_TIMESTAMPS_COMMIT_CONFIG, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.MEDIUM, "Whether to commit timestamps for Replicator's own consumer group. These are used in active-passive scenarios after a failover, when another instance of Replicator is configured with the same group ID in order to replicate messages written to the secondary data center back to the primary data center.", "Offset Translation", ++orderInGroup, ConfigDef.Width.NONE, "Enable Offset Timestamps Commit");
        group = "Schema Translation";
        orderInGroup = 0;
        configDef.define(SCHEMA_REGISTRY_TOPIC_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SCHEMA_REGISTRY_TOPIC_DOC, "Schema Translation", ++orderInGroup, ConfigDef.Width.NONE, "Schema Registry Topic").define(SCHEMA_REGISTRY_URL_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.HIGH, SCHEMA_REGISTRY_URL_DOC, "Schema Translation", ++orderInGroup, ConfigDef.Width.NONE, "Schema Registry URL").define(MAX_SCHEMAS_PER_SUBJECT_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.LOW, MAX_SCHEMAS_PER_SUBJECT_DOC, "Schema Translation", ++orderInGroup, ConfigDef.Width.NONE, "Max Schemas Cached Per Subject").define(BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG, ConfigDef.Type.STRING, (Object)BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT, (ConfigDef.Validator)EnumRecommender.in(BasicAuthCredentialsType.values()), ConfigDef.Importance.MEDIUM, BASIC_AUTH_CREDENTIALS_SOURCE_DOC, "Schema Translation", ++orderInGroup, ConfigDef.Width.NONE, "Basic Auth Credentials Source", (ConfigDef.Recommender)EnumRecommender.in(BasicAuthCredentialsType.values())).define(SCHEMA_REGISTRY_USER_INFO_CONFIG, ConfigDef.Type.PASSWORD, (Object)SCHEMA_REGISTRY_USER_INFO_DEFAULT, ConfigDef.Importance.MEDIUM, SCHEMA_REGISTRY_USER_INFO_DOC, "Schema Translation", ++orderInGroup, ConfigDef.Width.NONE, "Basic Auth User Info").define(SCHEMA_SUBJECT_TRANSLATOR_CLASS_CONFIG, ConfigDef.Type.CLASS, null, (ConfigDef.Validator)new SubjectTranslatorValidator(), ConfigDef.Importance.LOW, SCHEMA_SUBJECT_TRANSLATOR_CLASS_DOC, "Schema Translation", ++orderInGroup, ConfigDef.Width.NONE, "Schema Subject Translator");
        return configDef;
    }

    public ReplicatorSourceConnectorConfig(Map<String, String> parsedConfig) {
        super(CONFIG_DEF, parsedConfig);
        this.name = ReplicatorSourceConnectorConfig.parseName(parsedConfig);
    }

    protected ReplicatorSourceConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
        super(subclassConfigDef, props);
        this.name = ReplicatorSourceConnectorConfig.parseName(props);
    }

    private <T> T getInstance(String key, Class<T> t) {
        Class c = this.getClass(key);
        if (c == null) {
            return null;
        }
        Object o = Utils.newInstance((Class)c);
        if (!t.isInstance(o)) {
            throw new ConnectException(c.getName() + " is not an instance of " + t.getName());
        }
        return t.cast(o);
    }

    private Converter getDataConverter(String config, boolean isKey) {
        Converter converter = this.getInstance(config, Converter.class);
        Map originals = this.originalsWithPrefix(config + ".");
        converter.configure(originals, isKey);
        return converter;
    }

    public Converter getSourceKeyConverter() {
        return this.getDataConverter(SRC_KEY_CONVERTER_CONFIG, true);
    }

    public Converter getSourceValueConverter() {
        return this.getDataConverter(SRC_VALUE_CONVERTER_CONFIG, false);
    }

    public Converter getDestinationKeyConverter() {
        return this.getDataConverter(DEST_KEY_CONVERTER_CONFIG, true);
    }

    public Converter getDestinationValueConverter() {
        return this.getDataConverter(DEST_VALUE_CONVERTER_CONFIG, false);
    }

    private HeaderConverter getHeaderConverter(String config) {
        HeaderConverter converter = this.getInstance(config, HeaderConverter.class);
        Map originals = this.originalsWithPrefix(config + ".");
        converter.configure(originals);
        return converter;
    }

    public HeaderConverter getSourceHeaderConverter() {
        return this.getHeaderConverter(SRC_HEADER_CONVERTER_CONFIG);
    }

    public HeaderConverter getDestinationHeaderConverter() {
        return this.getHeaderConverter(DEST_HEADER_CONVERTER_CONFIG);
    }

    public String getName() {
        return this.name;
    }

    public Map<String, Object> dstAdminClientConfig() {
        return this.originalsWithPrefix(KafkaConfigs.KafkaCluster.DESTINATION.prefix());
    }

    public Map<String, Object> srcAdminClientConfig() {
        return this.originalsWithPrefix(KafkaConfigs.KafkaCluster.SOURCE.prefix());
    }

    public Pattern getTopicPattern() {
        String regex = this.getString(TOPIC_REGEX_CONFIG);
        return regex == null ? null : Pattern.compile(regex);
    }

    public Set<String> getTopics() {
        return new HashSet<String>(this.getList(TOPIC_WHITELIST_CONFIG));
    }

    public Set<String> getBlacklistTopics() {
        return new HashSet<String>(this.getList(TOPIC_BLACKLIST_CONFIG));
    }

    public String getTopicRenameFormat() {
        return this.getString(TOPIC_RENAME_FORMAT_CONFIG);
    }

    public int getTopicPollIntervalMs() {
        return this.getInt(TOPIC_POLL_INTERVAL_MS_CONFIG);
    }

    public int getTopicConfigSyncIntervalMs() {
        return this.getInt(TOPIC_CONFIG_SYNC_INTERVAL_MS_CONFIG);
    }

    public int getConsumerPollTimeoutIntervalMs() {
        return this.getInt(CONSUMER_POLL_TIMEOUT_INTERVAL_MS_CONFIG);
    }

    public Map<String, ?> getSourceConsumerConfigs() {
        Map configs = this.originalsWithPrefix(KafkaConfigs.KafkaCluster.SOURCE.prefix());
        configs.putAll(this.originalsWithPrefix("src.consumer."));
        return configs;
    }

    public int getTopicCreateBackoffMs() {
        return this.getInt(TOPIC_CREATE_BACKOFF_MS_CONFIG);
    }

    public int getDestTopicReplicationFactor() {
        return this.getInt(DST_TOPIC_REPLICATION_FACTOR);
    }

    public boolean getTopicAutoCreate() {
        return this.getBoolean(TOPIC_AUTO_CREATE_CONFIG);
    }

    public boolean getTopicPreservePartitions() {
        return this.getBoolean(TOPIC_PRESERVE_PARTITIONS_CONFIG);
    }

    public boolean getTopicConfigSync() {
        return this.getBoolean(TOPIC_CONFIG_SYNC_CONFIG);
    }

    public String getTopicTimestampType() {
        return this.getString(TOPIC_TIMESTAMP_TYPE_CONFIG);
    }

    public OffsetStart getOffsetStart() {
        return OffsetStart.get(this.getString(OFFSET_START));
    }

    public boolean isOffsetTopicCommitEnabled() {
        return this.getBoolean(OFFSET_TOPIC_COMMIT_CONFIG);
    }

    public int getOffsetTopicCommitBatchPeriodMs() {
        return this.getInt(OFFSET_TOPIC_COMMIT_BATCH_PERIOD_MS_CONFIG);
    }

    public int getOffsetTranslatorTasksMax() {
        return this.getInt(OFFSET_TRANSLATOR_TASKS_MAX_CONFIG);
    }

    public boolean areOffsetTranslatorTasksSeparate() {
        return this.getBoolean(OFFSET_TRANSLATOR_TASKS_SEPARATE_CONFIG);
    }

    public int getOffsetTranslatorBatchPeriodMs() {
        return this.getInt(OFFSET_TRANSLATOR_BATCH_PERIOD_MS_CONFIG);
    }

    public int getOffsetTranslatorBatchSize() {
        return this.getInt(OFFSET_TRANSLATOR_BATCH_SIZE_CONFIG);
    }

    public boolean isOffsetTimestampsCommitEnabled() {
        return this.getBoolean(OFFSET_TIMESTAMPS_COMMIT_CONFIG);
    }

    public boolean isProvenanceHeaderEnabled() {
        return this.getBoolean(PROVENANCE_HEADER_ENABLE_CONFIG);
    }

    public String getSchemaRegistryTopic() {
        return this.getString(SCHEMA_REGISTRY_TOPIC_CONFIG);
    }

    public List<String> getSchemaRegistryUrls() {
        return this.getList(SCHEMA_REGISTRY_URL_CONFIG);
    }

    public int getMaxSchemasPerSubject() {
        return this.getInt(MAX_SCHEMAS_PER_SUBJECT_CONFIG);
    }

    public SubjectTranslator getSubjectTranslator() {
        SubjectTranslator subjectTranslator = this.getInstance(SCHEMA_SUBJECT_TRANSLATOR_CLASS_CONFIG, SubjectTranslator.class);
        Map originals = this.originalsWithPrefix(SCHEMA_SUBJECT_TRANSLATOR_PREFIX);
        if (subjectTranslator == null) {
            return subject -> subject;
        }
        if (subjectTranslator instanceof DefaultSubjectTranslator) {
            originals.putAll(this.originalsWithPrefix("topic.", false));
        }
        subjectTranslator.configure(originals);
        return subjectTranslator;
    }

    public boolean hasAlignedTopicPreservePartitionsAndOffsetTopicCommit() {
        if (!this.getTopicPreservePartitions()) {
            return !this.isOffsetTopicCommitEnabled();
        }
        return true;
    }

    protected static String parseName(Map<String, String> props) {
        String nameProp = props.get("name");
        return nameProp != null ? nameProp : "replicator-source";
    }

    public static void main(String[] args) {
        System.out.println(CONFIG_DEF.toRst());
    }

    public static enum OffsetStart {
        CONNECT,
        CONSUMER;

        private static final Map<String, OffsetStart> lookup;

        public static OffsetStart get(String name) {
            return lookup.get(name.toLowerCase(Locale.ROOT));
        }

        public String toString() {
            return this.name().toLowerCase(Locale.ROOT);
        }

        static {
            lookup = new HashMap<String, OffsetStart>();
            for (OffsetStart m : EnumSet.allOf(OffsetStart.class)) {
                lookup.put(m.toString(), m);
            }
        }
    }

    private static class SubjectTranslatorValidator
    implements ConfigDef.Validator {
        private SubjectTranslatorValidator() {
        }

        public void ensureValid(String name, Object provider) {
            if (provider == null || provider instanceof Class && SubjectTranslator.class.isAssignableFrom((Class)provider)) {
                return;
            }
            throw new ConfigException(name, provider, "Class must extend: " + SubjectTranslator.class);
        }

        public String toString() {
            return "Any class implementing: " + SubjectTranslator.class;
        }
    }

    private static class FIlterOverrideValidator
    implements ConfigDef.Validator {
        private FIlterOverrideValidator() {
        }

        public void ensureValid(String name, Object value) {
            if (value == null) {
                return;
            }
            Matcher matcher = FILTER_OVERRIDE_PATTERNS.matcher((String)value);
            if (!matcher.matches()) {
                throw new ConfigException(name, value, "Filter override must be of the form <clusterId>,<topic>,<startTimeMs>-<endTimeMs>. Multiple overrides must be separated by semicolons.");
            }
        }
    }

    private static class BooleanParentRecommender
    implements ConfigDef.Recommender {
        private final String[] parentConfigNames;

        BooleanParentRecommender(String ... parentConfigNames) {
            this.parentConfigNames = parentConfigNames;
        }

        public List<Object> validValues(String name, Map<String, Object> connectorConfigs) {
            return Collections.emptyList();
        }

        public boolean visible(String name, Map<String, Object> connectorConfigs) {
            for (String parentConfigName : this.parentConfigNames) {
                if (!((Boolean)connectorConfigs.get(parentConfigName)).booleanValue()) continue;
                return true;
            }
            return false;
        }
    }
}

