package org.apache.kafka.connect.runtime;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;

/* loaded from: input_file:org/apache/kafka/connect/runtime/AbstractHerder.class */
public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
    private final String workerId;
    protected final Worker worker;
    private final String kafkaClusterId;
    protected final StatusBackingStore statusBackingStore;
    protected final ConfigBackingStore configBackingStore;
    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
    private final ExecutorService connectorExecutor;
    protected volatile boolean running = false;
    private Map<String, Connector> tempConnectors = new ConcurrentHashMap();

    public AbstractHerder(Worker worker, String str, String str2, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this.worker = worker;
        this.worker.herder = this;
        this.workerId = str;
        this.kafkaClusterId = str2;
        this.statusBackingStore = statusBackingStore;
        this.configBackingStore = configBackingStore;
        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
        this.connectorExecutor = Executors.newCachedThreadPool();
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public String kafkaClusterId() {
        return this.kafkaClusterId;
    }

    protected abstract int generation();

    /* JADX INFO: Access modifiers changed from: protected */
    public void startServices() {
        this.worker.start();
        this.statusBackingStore.start();
        this.configBackingStore.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopServices() {
        this.statusBackingStore.stop();
        this.configBackingStore.stop();
        this.worker.stop();
        this.connectorExecutor.shutdown();
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onStartup(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onPause(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.PAUSED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onResume(String str) {
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onShutdown(String str) {
        this.statusBackingStore.putSafe(new ConnectorStatus(str, AbstractStatus.State.UNASSIGNED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onFailure(String str, Throwable th) {
        this.statusBackingStore.putSafe(new ConnectorStatus(str, AbstractStatus.State.FAILED, trace(th), this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onStartup(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onFailure(ConnectorTaskId connectorTaskId, Throwable th) {
        this.statusBackingStore.putSafe(new TaskStatus(connectorTaskId, AbstractStatus.State.FAILED, this.workerId, generation(), trace(th)));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onShutdown(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.putSafe(new TaskStatus(connectorTaskId, AbstractStatus.State.UNASSIGNED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onResume(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.RUNNING, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onPause(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.PAUSED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
    public void onDeletion(String str) {
        Iterator<TaskStatus> it = this.statusBackingStore.getAll(str).iterator();
        while (it.hasNext()) {
            onDeletion(it.next().id());
        }
        this.statusBackingStore.put(new ConnectorStatus(str, AbstractStatus.State.DESTROYED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
    public void onDeletion(ConnectorTaskId connectorTaskId) {
        this.statusBackingStore.put(new TaskStatus(connectorTaskId, AbstractStatus.State.DESTROYED, this.workerId, generation()));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void pauseConnector(String str) {
        if (!this.configBackingStore.contains(str)) {
            throw new NotFoundException("Unknown connector " + str);
        }
        this.configBackingStore.putTargetState(str, TargetState.PAUSED);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void resumeConnector(String str) {
        if (!this.configBackingStore.contains(str)) {
            throw new NotFoundException("Unknown connector " + str);
        }
        this.configBackingStore.putTargetState(str, TargetState.STARTED);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public Plugins plugins() {
        return this.worker.getPlugins();
    }

    protected abstract Map<String, String> rawConfig(String str);

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectorConfig(String str, Callback<Map<String, String>> callback) {
        connectorInfo(str, (th, connectorInfo) -> {
            if (th != null) {
                callback.onCompletion(th, null);
            } else {
                callback.onCompletion(null, connectorInfo.config());
            }
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public Collection<String> connectors() {
        return this.configBackingStore.snapshot().connectors();
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ConnectorInfo connectorInfo(String str) {
        ClusterConfigState snapshot = this.configBackingStore.snapshot();
        if (!snapshot.contains(str)) {
            return null;
        }
        Map<String, String> rawConnectorConfig = snapshot.rawConnectorConfig(str);
        return new ConnectorInfo(str, rawConnectorConfig, snapshot.tasks(str), connectorTypeForClass(rawConnectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<ConnectorTaskId, Map<String, String>> buildTasksConfig(String str) {
        ClusterConfigState snapshot = this.configBackingStore.snapshot();
        if (!snapshot.contains(str)) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (ConnectorTaskId connectorTaskId : snapshot.tasks(str)) {
            hashMap.put(connectorTaskId, snapshot.taskConfig(connectorTaskId));
        }
        return hashMap;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ConnectorStateInfo connectorStatus(String str) {
        ConnectorStatus connectorStatus = this.statusBackingStore.get(str);
        if (connectorStatus == null) {
            throw new NotFoundException("No status found for connector " + str);
        }
        Collection<TaskStatus> all = this.statusBackingStore.getAll(str);
        ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(connectorStatus.state().toString(), connectorStatus.workerId(), connectorStatus.trace());
        ArrayList arrayList = new ArrayList();
        for (TaskStatus taskStatus : all) {
            arrayList.add(new ConnectorStateInfo.TaskState(taskStatus.id().task(), taskStatus.state().toString(), taskStatus.workerId(), taskStatus.trace()));
        }
        Collections.sort(arrayList);
        Map<String, String> rawConfig = rawConfig(str);
        return new ConnectorStateInfo(str, connectorState, arrayList, rawConfig == null ? ConnectorType.UNKNOWN : connectorTypeForClass(rawConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ActiveTopicsInfo connectorActiveTopics(String str) {
        return new ActiveTopicsInfo(str, (Collection) this.statusBackingStore.getAllTopics(str).stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void resetConnectorActiveTopics(String str) {
        this.statusBackingStore.getAllTopics(str).stream().forEach(topicStatus -> {
            this.statusBackingStore.deleteTopic(topicStatus.connector(), topicStatus.topic());
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public StatusBackingStore statusBackingStore() {
        return this.statusBackingStore;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId connectorTaskId) {
        TaskStatus taskStatus = this.statusBackingStore.get(connectorTaskId);
        if (taskStatus == null) {
            throw new NotFoundException("No status found for task " + connectorTaskId);
        }
        return new ConnectorStateInfo.TaskState(connectorTaskId.task(), taskStatus.state().toString(), taskStatus.workerId(), taskStatus.trace());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector, ConfigDef configDef, Map<String, String> map) {
        return configDef.validateAll(map);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void validateConnectorConfig(Map<String, String> map, Callback<ConfigInfos> callback) {
        validateConnectorConfig(map, callback, true);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void validateConnectorConfig(Map<String, String> map, Callback<ConfigInfos> callback, boolean z) {
        this.connectorExecutor.submit(() -> {
            try {
                callback.onCompletion(null, validateConnectorConfig((Map<String, String>) map, z));
            } catch (Throwable th) {
                callback.onCompletion(th, null);
            }
        });
    }

    ConfigInfos validateConnectorConfig(Map<String, String> map, boolean z) {
        ConfigDef configDef;
        org.apache.kafka.connect.health.ConnectorType connectorType;
        if (this.worker.configTransformer() != null) {
            map = this.worker.configTransformer().transform(map);
        }
        String str = map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
        if (str == null) {
            throw new BadRequestException("Connector config " + map + " contains no connector type");
        }
        Connector connector = getConnector(str);
        ClassLoader compareAndSwapLoaders = plugins().compareAndSwapLoaders(connector);
        try {
            if (connector instanceof SourceConnector) {
                configDef = SourceConnectorConfig.configDef();
                connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE;
            } else {
                configDef = SinkConnectorConfig.configDef();
                SinkConnectorConfig.validate(map);
                connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
            }
            ConfigDef enrich = ConnectorConfig.enrich(plugins(), configDef, map, false);
            ArrayList arrayList = new ArrayList(validateBasicConnectorConfig(connector, enrich, map).values());
            LinkedHashMap linkedHashMap = new LinkedHashMap(enrich.configKeys());
            LinkedHashSet linkedHashSet = new LinkedHashSet(enrich.groups());
            ConfigDef config = connector.config();
            if (null == config) {
                throw new BadRequestException(String.format("%s.config() must return a ConfigDef that is not null.", connector.getClass().getName()));
            }
            Config validate = connector.validate(map);
            if (null == validate) {
                throw new BadRequestException(String.format("%s.validate() must return a Config that is not null.", connector.getClass().getName()));
            }
            linkedHashMap.putAll(config.configKeys());
            linkedHashSet.addAll(config.groups());
            arrayList.addAll(validate.configValues());
            ConfigInfos generateResult = generateResult(str, linkedHashMap, arrayList, new ArrayList(linkedHashSet));
            AbstractConfig abstractConfig = new AbstractConfig(new ConfigDef(), map, z);
            String str2 = map.get("name");
            ConfigInfos configInfos = null;
            if (connectorType.equals(org.apache.kafka.connect.health.ConnectorType.SOURCE)) {
                ConfigInfos mergeConfigInfos = mergeConfigInfos(str, generateResult, validateClientOverrides(str2, ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, abstractConfig, ProducerConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.PRODUCER, this.connectorClientConfigOverridePolicy));
                Plugins.compareAndSwapLoaders(compareAndSwapLoaders);
                return mergeConfigInfos;
            }
            ConfigInfos validateClientOverrides = validateClientOverrides(str2, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, abstractConfig, ProducerConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.CONSUMER, this.connectorClientConfigOverridePolicy);
            String str3 = map.get(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG);
            if (str3 != null && !str3.isEmpty()) {
                configInfos = validateClientOverrides(str2, ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, abstractConfig, ProducerConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.ADMIN, this.connectorClientConfigOverridePolicy);
            }
            ConfigInfos mergeConfigInfos2 = mergeConfigInfos(str, generateResult, null, validateClientOverrides, configInfos);
            Plugins.compareAndSwapLoaders(compareAndSwapLoaders);
            return mergeConfigInfos2;
        } catch (Throwable th) {
            Plugins.compareAndSwapLoaders(compareAndSwapLoaders);
            throw th;
        }
    }

    private static ConfigInfos mergeConfigInfos(String str, ConfigInfos... configInfosArr) {
        int i = 0;
        LinkedList linkedList = new LinkedList();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (ConfigInfos configInfos : configInfosArr) {
            if (configInfos != null) {
                i += configInfos.errorCount();
                linkedList.addAll(configInfos.values());
                linkedHashSet.addAll(configInfos.groups());
            }
        }
        return new ConfigInfos(str, i, new ArrayList(linkedHashSet), linkedList);
    }

    private static ConfigInfos validateClientOverrides(String str, String str2, AbstractConfig abstractConfig, ConfigDef configDef, Class<? extends Connector> cls, org.apache.kafka.connect.health.ConnectorType connectorType, ConnectorClientConfigRequest.ClientType clientType, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        int i = 0;
        LinkedList linkedList = new LinkedList();
        Map configKeys = configDef.configKeys();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : abstractConfig.originalsWithPrefix(str2).entrySet()) {
            String str3 = (String) entry.getKey();
            Object value = entry.getValue();
            ConfigDef.ConfigKey configKey = (ConfigDef.ConfigKey) configDef.configKeys().get(str3);
            hashMap.put(str3, configKey != null ? ConfigDef.parseType(str3, value, configKey.type) : value);
        }
        List<ConfigValue> validate = connectorClientConfigOverridePolicy.validate(new ConnectorClientConfigRequest(str, connectorType, cls, hashMap, clientType));
        if (validate != null) {
            for (ConfigValue configValue : validate) {
                ConfigDef.ConfigKey configKey2 = (ConfigDef.ConfigKey) configKeys.get(configValue.name());
                ConfigKeyInfo configKeyInfo = null;
                if (configKey2 != null) {
                    if (configKey2.group != null) {
                        linkedHashSet.add(configKey2.group);
                    }
                    configKeyInfo = convertConfigKey(configKey2, str2);
                }
                ConfigValue configValue2 = new ConfigValue(str2 + configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages());
                if (configValue2.errorMessages().size() > 0) {
                    i++;
                }
                linkedList.add(new ConfigInfo(configKeyInfo, convertConfigValue(configValue2, configKey2 != null ? configKey2.type : null)));
            }
        }
        return new ConfigInfos(cls.toString(), i, new ArrayList(linkedHashSet), linkedList);
    }

    public static ConfigInfos generateResult(String str, Map<String, ConfigDef.ConfigKey> map, List<ConfigValue> list, List<String> list2) {
        int i = 0;
        LinkedList linkedList = new LinkedList();
        HashMap hashMap = new HashMap();
        for (ConfigValue configValue : list) {
            String name = configValue.name();
            hashMap.put(name, configValue);
            if (!map.containsKey(name)) {
                linkedList.add(new ConfigInfo(null, convertConfigValue(configValue, null)));
                i += configValue.errorMessages().size();
            }
        }
        for (Map.Entry<String, ConfigDef.ConfigKey> entry : map.entrySet()) {
            String key = entry.getKey();
            ConfigKeyInfo convertConfigKey = convertConfigKey(entry.getValue());
            ConfigDef.Type type = entry.getValue().type;
            ConfigValueInfo configValueInfo = null;
            if (hashMap.containsKey(key)) {
                ConfigValue configValue2 = (ConfigValue) hashMap.get(key);
                configValueInfo = convertConfigValue(configValue2, type);
                i += configValue2.errorMessages().size();
            }
            linkedList.add(new ConfigInfo(convertConfigKey, configValueInfo));
        }
        return new ConfigInfos(str, i, list2, linkedList);
    }

    private static ConfigKeyInfo convertConfigKey(ConfigDef.ConfigKey configKey) {
        return convertConfigKey(configKey, "");
    }

    private static ConfigKeyInfo convertConfigKey(ConfigDef.ConfigKey configKey, String str) {
        String convertToString;
        String str2 = str + configKey.name;
        ConfigDef.Type type = configKey.type;
        String name = configKey.type.name();
        boolean z = false;
        if (ConfigDef.NO_DEFAULT_VALUE.equals(configKey.defaultValue)) {
            convertToString = null;
            z = true;
        } else {
            convertToString = ConfigDef.convertToString(configKey.defaultValue, type);
        }
        return new ConfigKeyInfo(str2, name, z, convertToString, configKey.importance.name(), configKey.documentation, configKey.group, configKey.orderInGroup, configKey.width.name(), configKey.displayName, configKey.dependents);
    }

    private static ConfigValueInfo convertConfigValue(ConfigValue configValue, ConfigDef.Type type) {
        String convertToString = ConfigDef.convertToString(configValue.value(), type);
        LinkedList linkedList = new LinkedList();
        if (type == ConfigDef.Type.LIST) {
            Iterator it = configValue.recommendedValues().iterator();
            while (it.hasNext()) {
                linkedList.add(ConfigDef.convertToString(it.next(), ConfigDef.Type.STRING));
            }
        } else {
            Iterator it2 = configValue.recommendedValues().iterator();
            while (it2.hasNext()) {
                linkedList.add(ConfigDef.convertToString(it2.next(), type));
            }
        }
        return new ConfigValueInfo(configValue.name(), convertToString, linkedList, configValue.errorMessages(), configValue.visible());
    }

    protected Connector getConnector(String str) {
        if (this.tempConnectors.containsKey(str)) {
            return this.tempConnectors.get(str);
        }
        Connector newConnector = plugins().newConnector(str);
        this.tempConnectors.put(str, newConnector);
        return newConnector;
    }

    public ConnectorType connectorTypeForClass(String str) {
        return ConnectorType.from(getConnector(str).getClass());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean maybeAddConfigErrors(ConfigInfos configInfos, Callback<Herder.Created<ConnectorInfo>> callback) {
        int errorCount = configInfos.errorCount();
        boolean z = errorCount > 0;
        if (z) {
            StringBuilder sb = new StringBuilder();
            sb.append("Connector configuration is invalid and contains the following ").append(errorCount).append(" error(s):");
            Iterator<ConfigInfo> it = configInfos.values().iterator();
            while (it.hasNext()) {
                Iterator<String> it2 = it.next().configValue().errors().iterator();
                while (it2.hasNext()) {
                    sb.append('\n').append(it2.next());
                }
            }
            callback.onCompletion(new BadRequestException(sb.append("\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`").toString()), null);
        }
        return z;
    }

    private String trace(Throwable th) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            th.printStackTrace(new PrintStream((OutputStream) byteArrayOutputStream, false, StandardCharsets.UTF_8.name()));
            return byteArrayOutputStream.toString("UTF-8");
        } catch (UnsupportedEncodingException e) {
            return null;
        }
    }

    public static List<Map<String, String>> reverseTransform(String str, ClusterConfigState clusterConfigState, List<Map<String, String>> list) {
        Map<String, String> rawConnectorConfig = clusterConfigState.rawConnectorConfig(str);
        Set<String> keysWithVariableValues = keysWithVariableValues(rawConnectorConfig, ConfigTransformer.DEFAULT_PATTERN);
        ArrayList arrayList = new ArrayList();
        Iterator<Map<String, String>> it = list.iterator();
        while (it.hasNext()) {
            HashMap hashMap = new HashMap(it.next());
            for (String str2 : keysWithVariableValues) {
                if (hashMap.containsKey(str2)) {
                    hashMap.put(str2, rawConnectorConfig.get(str2));
                }
            }
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    static Set<String> keysWithVariableValues(Map<String, String> map, Pattern pattern) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getValue() != null && pattern.matcher(entry.getValue()).find()) {
                hashSet.add(entry.getKey());
            }
        }
        return hashSet;
    }
}
