/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Loggers;
import org.apache.kafka.connect.runtime.RestartPlan;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.Stage;
import org.apache.kafka.connect.util.TemporaryStage;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHerder
implements Herder,
TaskStatus.Listener,
ConnectorStatus.Listener {
    private static final Logger log = LoggerFactory.getLogger(AbstractHerder.class);
    private final String workerId;
    protected final Worker worker;
    private final String kafkaClusterId;
    protected final StatusBackingStore statusBackingStore;
    protected final ConfigBackingStore configBackingStore;
    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
    protected volatile boolean running = false;
    private final ExecutorService connectorExecutor;
    private final Time time;
    protected final Loggers loggers;
    private final ConcurrentMap<String, Connector> tempConnectors = new ConcurrentHashMap<String, Connector>();

    public AbstractHerder(Worker worker, String workerId, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, Time time) {
        this.worker = worker;
        this.worker.herder = this;
        this.workerId = workerId;
        this.kafkaClusterId = kafkaClusterId;
        this.statusBackingStore = statusBackingStore;
        this.configBackingStore = configBackingStore;
        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
        this.connectorExecutor = Executors.newCachedThreadPool();
        this.time = time;
        this.loggers = new Loggers(time);
    }

    @Override
    public String kafkaClusterId() {
        return this.kafkaClusterId;
    }

    protected abstract int generation();

    protected void startServices() {
        this.worker.start();
        this.statusBackingStore.start();
        this.configBackingStore.start();
    }

    protected void stopServices() {
        this.statusBackingStore.stop();
        this.configBackingStore.stop();
        this.worker.stop();
        this.connectorExecutor.shutdown();
        Utils.closeQuietly((AutoCloseable)this.connectorClientConfigOverridePolicy, (String)"connector client config override policy");
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public void onStartup(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onStop(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.STOPPED, this.workerId, this.generation()));
    }

    @Override
    public void onPause(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.PAUSED, this.workerId, this.generation()));
    }

    @Override
    public void onResume(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onShutdown(String connector) {
        this.statusBackingStore.putSafe(new ConnectorStatus(connector, AbstractStatus.State.UNASSIGNED, this.workerId, this.generation()));
    }

    @Override
    public void onFailure(String connector, Throwable cause) {
        this.statusBackingStore.putSafe(new ConnectorStatus(connector, AbstractStatus.State.FAILED, this.trace(cause), this.workerId, this.generation()));
    }

    @Override
    public void onStartup(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onFailure(ConnectorTaskId id, Throwable cause) {
        this.statusBackingStore.putSafe(new TaskStatus(id, AbstractStatus.State.FAILED, this.workerId, this.generation(), this.trace(cause)));
    }

    @Override
    public void onShutdown(ConnectorTaskId id) {
        this.statusBackingStore.putSafe(new TaskStatus(id, AbstractStatus.State.UNASSIGNED, this.workerId, this.generation()));
    }

    @Override
    public void onResume(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onPause(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.PAUSED, this.workerId, this.generation()));
    }

    @Override
    public void onDeletion(String connector) {
        for (TaskStatus status : this.statusBackingStore.getAll(connector)) {
            this.onDeletion((ConnectorTaskId)status.id());
        }
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.DESTROYED, this.workerId, this.generation()));
    }

    @Override
    public void onDeletion(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.DESTROYED, this.workerId, this.generation()));
    }

    @Override
    public void onRestart(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.RESTARTING, this.workerId, this.generation()));
    }

    @Override
    public void onRestart(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.RESTARTING, this.workerId, this.generation()));
    }

    @Override
    public void pauseConnector(String connector) {
        if (!this.configBackingStore.contains(connector)) {
            throw new NotFoundException("Unknown connector " + connector);
        }
        this.configBackingStore.putTargetState(connector, TargetState.PAUSED);
    }

    @Override
    public void resumeConnector(String connector) {
        if (!this.configBackingStore.contains(connector)) {
            throw new NotFoundException("Unknown connector " + connector);
        }
        this.configBackingStore.putTargetState(connector, TargetState.STARTED);
    }

    @Override
    public Plugins plugins() {
        return this.worker.getPlugins();
    }

    protected abstract Map<String, String> rawConfig(String var1);

    @Override
    public void connectorConfig(String connName, Callback<Map<String, String>> callback) {
        this.connectorInfo(connName, (error, result) -> {
            if (error != null) {
                callback.onCompletion(error, null);
            } else {
                callback.onCompletion(null, result.config());
            }
        });
    }

    @Override
    public Collection<String> connectors() {
        return this.configBackingStore.snapshot().connectors();
    }

    @Override
    public ConnectorInfo connectorInfo(String connector) {
        ClusterConfigState configState = this.configBackingStore.snapshot();
        if (!configState.contains(connector)) {
            return null;
        }
        Map<String, String> config = configState.rawConnectorConfig(connector);
        return new ConnectorInfo(connector, config, configState.tasks(connector), this.connectorType(config));
    }

    protected Map<ConnectorTaskId, Map<String, String>> buildTasksConfig(String connector) {
        ClusterConfigState configState = this.configBackingStore.snapshot();
        if (!configState.contains(connector)) {
            return Collections.emptyMap();
        }
        HashMap<ConnectorTaskId, Map<String, String>> configs = new HashMap<ConnectorTaskId, Map<String, String>>();
        for (ConnectorTaskId cti : configState.tasks(connector)) {
            configs.put(cti, configState.rawTaskConfig(cti));
        }
        return configs;
    }

    @Override
    public ConnectorStateInfo connectorStatus(String connName) {
        ConnectorStatus connector = this.statusBackingStore.get(connName);
        if (connector == null) {
            throw new NotFoundException("No status found for connector " + connName);
        }
        Collection<TaskStatus> tasks = this.statusBackingStore.getAll(connName);
        ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(connector.state().toString(), connector.workerId(), connector.trace());
        ArrayList<ConnectorStateInfo.TaskState> taskStates = new ArrayList<ConnectorStateInfo.TaskState>();
        for (TaskStatus status : tasks) {
            taskStates.add(new ConnectorStateInfo.TaskState(((ConnectorTaskId)status.id()).task(), status.state().toString(), status.workerId(), status.trace()));
        }
        Collections.sort(taskStates);
        Map<String, String> conf = this.rawConfig(connName);
        return new ConnectorStateInfo(connName, connectorState, taskStates, this.connectorType(conf));
    }

    @Override
    public ActiveTopicsInfo connectorActiveTopics(String connName) {
        Collection topics = this.statusBackingStore.getAllTopics(connName).stream().map(TopicStatus::topic).collect(Collectors.toList());
        return new ActiveTopicsInfo(connName, topics);
    }

    @Override
    public void resetConnectorActiveTopics(String connName) {
        this.statusBackingStore.getAllTopics(connName).stream().forEach(status -> this.statusBackingStore.deleteTopic(status.connector(), status.topic()));
    }

    @Override
    public StatusBackingStore statusBackingStore() {
        return this.statusBackingStore;
    }

    @Override
    public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
        TaskStatus status = this.statusBackingStore.get(id);
        if (status == null) {
            throw new NotFoundException("No status found for task " + id);
        }
        return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(), status.workerId(), status.trace());
    }

    protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
        Map result = configDef.validateAll(config);
        SinkConnectorConfig.validate(config, result);
        return result;
    }

    protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, Map<String, String> config) {
        return configDef.validateAll(config);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> ConfigInfos validateConverterConfig(Map<String, String> connectorConfig, ConfigValue pluginConfigValue, Class<T> pluginInterface, Function<T, ConfigDef> configDefAccessor, String pluginName, String pluginProperty, Map<String, String> defaultProperties, Function<String, TemporaryStage> reportStage) {
        Object pluginInstance;
        Objects.requireNonNull(connectorConfig);
        Objects.requireNonNull(pluginInterface);
        Objects.requireNonNull(configDefAccessor);
        Objects.requireNonNull(pluginName);
        Objects.requireNonNull(pluginProperty);
        String pluginClass = connectorConfig.get(pluginProperty);
        if (pluginClass == null || pluginConfigValue == null || !pluginConfigValue.errorMessages().isEmpty()) {
            return null;
        }
        String stageDescription = "instantiating the connector's " + pluginName + " for validation";
        try (TemporaryStage stage = reportStage.apply(stageDescription);){
            pluginInstance = Utils.newInstance((String)pluginClass, pluginInterface);
        }
        catch (ClassNotFoundException | RuntimeException e) {
            log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", new Object[]{pluginName, pluginClass, e});
            pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
            return null;
        }
        try {
            List configValues;
            ConfigDef configDef;
            stageDescription = "retrieving the configuration definition from the connector's " + pluginName;
            try (TemporaryStage stage = reportStage.apply(stageDescription);){
                configDef = configDefAccessor.apply(pluginInstance);
            }
            catch (RuntimeException e) {
                log.error("Failed to load ConfigDef from {} of type {}", new Object[]{pluginName, pluginClass, e});
                pluginConfigValue.addErrorMessage("Failed to load ConfigDef from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
                ConfigInfos configInfos = null;
                Utils.maybeCloseQuietly((Object)pluginInstance, (String)(pluginName + " " + pluginClass));
                return configInfos;
            }
            if (configDef == null) {
                log.warn("{}.config() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", (Object)pluginClass);
                ConfigInfos e = null;
                return e;
            }
            String pluginPrefix = pluginProperty + ".";
            Map pluginConfig = Utils.entriesWithPrefix(connectorConfig, (String)pluginPrefix);
            if (defaultProperties != null) {
                defaultProperties.forEach(pluginConfig::putIfAbsent);
            }
            stageDescription = "performing config validation for the connector's " + pluginName;
            try (TemporaryStage stage = reportStage.apply(stageDescription);){
                configValues = configDef.validate(pluginConfig);
            }
            catch (RuntimeException e) {
                log.error("Failed to perform config validation for {} of type {}", new Object[]{pluginName, pluginClass, e});
                pluginConfigValue.addErrorMessage("Failed to perform config validation for " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
                ConfigInfos configInfos = null;
                Utils.maybeCloseQuietly((Object)pluginInstance, (String)(pluginName + " " + pluginClass));
                return configInfos;
            }
            ConfigInfos configInfos = AbstractHerder.prefixedConfigInfos(configDef.configKeys(), configValues, pluginPrefix);
            return configInfos;
        }
        finally {
            Utils.maybeCloseQuietly((Object)pluginInstance, (String)(pluginName + " " + pluginClass));
        }
    }

    private ConfigInfos validateHeaderConverterConfig(Map<String, String> connectorConfig, ConfigValue headerConverterConfigValue, Function<String, TemporaryStage> reportStage) {
        return this.validateConverterConfig(connectorConfig, headerConverterConfigValue, HeaderConverter.class, HeaderConverter::config, "header converter", "header.converter", Collections.singletonMap("converter.type", ConverterType.HEADER.getName()), reportStage);
    }

    private ConfigInfos validateKeyConverterConfig(Map<String, String> connectorConfig, ConfigValue keyConverterConfigValue, Function<String, TemporaryStage> reportStage) {
        return this.validateConverterConfig(connectorConfig, keyConverterConfigValue, Converter.class, Converter::config, "key converter", "key.converter", Collections.singletonMap("converter.type", ConverterType.KEY.getName()), reportStage);
    }

    private ConfigInfos validateValueConverterConfig(Map<String, String> connectorConfig, ConfigValue valueConverterConfigValue, Function<String, TemporaryStage> reportStage) {
        return this.validateConverterConfig(connectorConfig, valueConverterConfigValue, Converter.class, Converter::config, "value converter", "value.converter", Collections.singletonMap("converter.type", ConverterType.VALUE.getName()), reportStage);
    }

    @Override
    public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback) {
        this.validateConnectorConfig(connectorProps, callback, true);
    }

    @Override
    public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback, boolean doLog) {
        Stage waitingForThread = new Stage("waiting for a new thread to become available for connector validation", this.time.milliseconds());
        callback.recordStage(waitingForThread);
        this.connectorExecutor.submit(() -> {
            waitingForThread.complete(this.time.milliseconds());
            try {
                Function<String, TemporaryStage> reportStage = description -> new TemporaryStage((String)description, callback, this.time);
                ConfigInfos result = this.validateConnectorConfig(connectorProps, reportStage, doLog);
                callback.onCompletion(null, result);
            }
            catch (Throwable t) {
                callback.onCompletion(t, null);
            }
        });
    }

    public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
        String connectorName = request.connectorName();
        ConnectorStatus connectorStatus = this.statusBackingStore.get(connectorName);
        if (connectorStatus == null) {
            return Optional.empty();
        }
        AbstractStatus.State connectorState = request.shouldRestartConnector(connectorStatus) ? AbstractStatus.State.RESTARTING : connectorStatus.state();
        ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState(connectorState.toString(), connectorStatus.workerId(), connectorStatus.trace());
        List<ConnectorStateInfo.TaskState> taskStates = this.statusBackingStore.getAll(connectorName).stream().map(taskStatus -> {
            AbstractStatus.State taskState = request.shouldRestartTask((TaskStatus)taskStatus) ? AbstractStatus.State.RESTARTING : taskStatus.state();
            return new ConnectorStateInfo.TaskState(((ConnectorTaskId)taskStatus.id()).task(), taskState.toString(), taskStatus.workerId(), taskStatus.trace());
        }).collect(Collectors.toList());
        Map<String, String> conf = this.rawConfig(connectorName);
        ConnectorStateInfo stateInfo = new ConnectorStateInfo(connectorName, connectorInfoState, taskStates, this.connectorType(conf));
        return Optional.of(new RestartPlan(request, stateInfo));
    }

    protected boolean connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
        return connectorType == org.apache.kafka.connect.health.ConnectorType.SINK;
    }

    protected boolean connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
        if (connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE) {
            return SourceConnectorConfig.usesTopicCreation(connProps);
        }
        return SinkConnectorConfig.hasDlqTopicConfig(connProps);
    }

    protected boolean connectorUsesProducer(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
        return connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE || SinkConnectorConfig.hasDlqTopicConfig(connProps);
    }

    ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, Function<String, TemporaryStage> reportStage, boolean doLog) {
        String connType;
        String stageDescription;
        if (this.worker.configTransformer() != null) {
            stageDescription = "resolving transformed configuration properties for the connector";
            try (TemporaryStage stage = reportStage.apply(stageDescription);){
                connectorProps = this.worker.configTransformer().transform(connectorProps);
            }
        }
        if ((connType = connectorProps.get("connector.class")) == null) {
            throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
        }
        Connector connector = this.getConnector(connType);
        ClassLoader connectorLoader = this.plugins().connectorLoader(connType);
        try (LoaderSwap loaderSwap = this.plugins().withClassLoader(connectorLoader);){
            Throwable throwable;
            TemporaryStage stage;
            Config config;
            ConfigDef configDef;
            Map<String, ConfigValue> validatedConnectorConfig;
            Throwable throwable2;
            TemporaryStage stage2;
            ConfigDef enrichedConfigDef;
            org.apache.kafka.connect.health.ConnectorType connectorType;
            if (connector instanceof SourceConnector) {
                connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE;
                enrichedConfigDef = ConnectorConfig.enrich(this.plugins(), SourceConnectorConfig.configDef(), connectorProps, false);
                stageDescription = "validating source connector-specific properties for the connector";
                stage2 = reportStage.apply(stageDescription);
                throwable2 = null;
                try {
                    validatedConnectorConfig = this.validateSourceConnectorConfig((SourceConnector)connector, enrichedConfigDef, connectorProps);
                }
                catch (Throwable throwable3) {
                    throwable2 = throwable3;
                    throw throwable3;
                }
                finally {
                    if (stage2 != null) {
                        if (throwable2 != null) {
                            try {
                                stage2.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                        } else {
                            stage2.close();
                        }
                    }
                }
            }
            connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
            enrichedConfigDef = ConnectorConfig.enrich(this.plugins(), SinkConnectorConfig.configDef(), connectorProps, false);
            stageDescription = "validating sink connector-specific properties for the connector";
            stage2 = reportStage.apply(stageDescription);
            throwable2 = null;
            try {
                validatedConnectorConfig = this.validateSinkConnectorConfig((SinkConnector)connector, enrichedConfigDef, connectorProps);
            }
            catch (Throwable throwable5) {
                throwable2 = throwable5;
                throw throwable5;
            }
            finally {
                if (stage2 != null) {
                    if (throwable2 != null) {
                        try {
                            stage2.close();
                        }
                        catch (Throwable throwable6) {
                            throwable2.addSuppressed(throwable6);
                        }
                    } else {
                        stage2.close();
                    }
                }
            }
            connectorProps.entrySet().stream().filter(e -> e.getValue() == null).map(Map.Entry::getKey).forEach(prop -> validatedConnectorConfig.computeIfAbsent((String)prop, ConfigValue::new).addErrorMessage("Null value can not be supplied as the configuration value."));
            ArrayList<ConfigValue> configValues = new ArrayList<ConfigValue>(validatedConnectorConfig.values());
            LinkedHashMap<String, ConfigDef.ConfigKey> configKeys = new LinkedHashMap<String, ConfigDef.ConfigKey>(enrichedConfigDef.configKeys());
            LinkedHashSet allGroups = new LinkedHashSet(enrichedConfigDef.groups());
            stageDescription = "retrieving the configuration definition from the connector";
            try (TemporaryStage stage3 = reportStage.apply(stageDescription);){
                configDef = connector.config();
            }
            if (null == configDef) {
                throw new BadRequestException(String.format("%s.config() must return a ConfigDef that is not null.", connector.getClass().getName()));
            }
            stageDescription = "performing multi-property validation for the connector";
            try (TemporaryStage stage4 = reportStage.apply(stageDescription);){
                config = connector.validate(connectorProps);
            }
            if (null == config) {
                throw new BadRequestException(String.format("%s.validate() must return a Config that is not null.", connector.getClass().getName()));
            }
            configKeys.putAll(configDef.configKeys());
            allGroups.addAll(configDef.groups());
            configValues.addAll(config.configValues());
            ConfigInfos headerConverterConfigInfos = this.validateHeaderConverterConfig(connectorProps, validatedConnectorConfig.get("header.converter"), reportStage);
            ConfigInfos keyConverterConfigInfos = this.validateKeyConverterConfig(connectorProps, validatedConnectorConfig.get("key.converter"), reportStage);
            ConfigInfos valueConverterConfigInfos = this.validateValueConverterConfig(connectorProps, validatedConnectorConfig.get("value.converter"), reportStage);
            ConfigInfos configInfos = AbstractHerder.generateResult(connType, configKeys, configValues, new ArrayList<String>(allGroups));
            AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps, doLog);
            String connName = connectorProps.get("name");
            ConfigInfos producerConfigInfos = null;
            ConfigInfos consumerConfigInfos = null;
            ConfigInfos adminConfigInfos = null;
            if (this.connectorUsesProducer(connectorType, connectorProps)) {
                stageDescription = "validating producer config overrides for the connector";
                stage = reportStage.apply(stageDescription);
                throwable = null;
                try {
                    producerConfigInfos = AbstractHerder.validateClientOverrides(connName, "producer.override.", connectorConfig, ProducerConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.PRODUCER, this.connectorClientConfigOverridePolicy);
                }
                catch (Throwable throwable7) {
                    throwable = throwable7;
                    throw throwable7;
                }
                finally {
                    if (stage != null) {
                        if (throwable != null) {
                            try {
                                stage.close();
                            }
                            catch (Throwable throwable8) {
                                throwable.addSuppressed(throwable8);
                            }
                        } else {
                            stage.close();
                        }
                    }
                }
            }
            if (this.connectorUsesAdmin(connectorType, connectorProps)) {
                stageDescription = "validating admin config overrides for the connector";
                stage = reportStage.apply(stageDescription);
                throwable = null;
                try {
                    adminConfigInfos = AbstractHerder.validateClientOverrides(connName, "admin.override.", connectorConfig, AdminClientConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.ADMIN, this.connectorClientConfigOverridePolicy);
                }
                catch (Throwable throwable9) {
                    throwable = throwable9;
                    throw throwable9;
                }
                finally {
                    if (stage != null) {
                        if (throwable != null) {
                            try {
                                stage.close();
                            }
                            catch (Throwable throwable10) {
                                throwable.addSuppressed(throwable10);
                            }
                        } else {
                            stage.close();
                        }
                    }
                }
            }
            if (this.connectorUsesConsumer(connectorType, connectorProps)) {
                stageDescription = "validating consumer config overrides for the connector";
                stage = reportStage.apply(stageDescription);
                throwable = null;
                try {
                    consumerConfigInfos = AbstractHerder.validateClientOverrides(connName, "consumer.override.", connectorConfig, ConsumerConfig.configDef(), connector.getClass(), connectorType, ConnectorClientConfigRequest.ClientType.CONSUMER, this.connectorClientConfigOverridePolicy);
                }
                catch (Throwable throwable11) {
                    throwable = throwable11;
                    throw throwable11;
                }
                finally {
                    if (stage != null) {
                        if (throwable != null) {
                            try {
                                stage.close();
                            }
                            catch (Throwable throwable12) {
                                throwable.addSuppressed(throwable12);
                            }
                        } else {
                            stage.close();
                        }
                    }
                }
            }
            ConfigInfos configInfos2 = AbstractHerder.mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos, headerConverterConfigInfos, keyConverterConfigInfos, valueConverterConfigInfos);
            return configInfos2;
        }
    }

    private static ConfigInfos mergeConfigInfos(String connType, ConfigInfos ... configInfosList) {
        int errorCount = 0;
        LinkedList<ConfigInfo> configInfoList = new LinkedList<ConfigInfo>();
        LinkedHashSet<String> groups = new LinkedHashSet<String>();
        for (ConfigInfos configInfos : configInfosList) {
            if (configInfos == null) continue;
            errorCount += configInfos.errorCount();
            configInfoList.addAll(configInfos.values());
            groups.addAll(configInfos.groups());
        }
        return new ConfigInfos(connType, errorCount, new ArrayList<String>(groups), configInfoList);
    }

    private static ConfigInfos validateClientOverrides(String connName, String prefix, AbstractConfig connectorConfig, ConfigDef configDef, Class<? extends Connector> connectorClass, org.apache.kafka.connect.health.ConnectorType connectorType, ConnectorClientConfigRequest.ClientType clientType, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        HashMap clientConfigs = new HashMap();
        for (Map.Entry rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) {
            String configName = (String)rawClientConfig.getKey();
            Object rawConfigValue = rawClientConfig.getValue();
            ConfigDef.ConfigKey configKey = (ConfigDef.ConfigKey)configDef.configKeys().get(configName);
            Object parsedConfigValue = configKey != null ? ConfigDef.parseType((String)configName, rawConfigValue, (ConfigDef.Type)configKey.type) : rawConfigValue;
            clientConfigs.put(configName, parsedConfigValue);
        }
        ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(connName, connectorType, connectorClass, clientConfigs, clientType);
        List configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
        return AbstractHerder.prefixedConfigInfos(configDef.configKeys(), configValues, prefix);
    }

    private static ConfigInfos prefixedConfigInfos(Map<String, ConfigDef.ConfigKey> configKeys, List<ConfigValue> configValues, String prefix) {
        int errorCount = 0;
        LinkedHashSet<String> groups = new LinkedHashSet<String>();
        ArrayList<ConfigInfo> configInfos = new ArrayList<ConfigInfo>();
        if (configValues == null) {
            return new ConfigInfos("", 0, new ArrayList<String>(groups), configInfos);
        }
        for (ConfigValue validatedConfigValue : configValues) {
            ConfigValue configValue;
            ConfigDef.ConfigKey configKey = configKeys.get(validatedConfigValue.name());
            ConfigKeyInfo configKeyInfo = null;
            if (configKey != null) {
                if (configKey.group != null) {
                    groups.add(configKey.group);
                }
                configKeyInfo = AbstractHerder.convertConfigKey(configKey, prefix);
            }
            if ((configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(), validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages())).errorMessages().size() > 0) {
                ++errorCount;
            }
            ConfigValueInfo configValueInfo = AbstractHerder.convertConfigValue(configValue, configKey != null ? configKey.type : null);
            configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo));
        }
        return new ConfigInfos("", errorCount, new ArrayList<String>(groups), configInfos);
    }

    public static ConfigInfos generateResult(String connType, Map<String, ConfigDef.ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
        String configName;
        int errorCount = 0;
        LinkedList<ConfigInfo> configInfoList = new LinkedList<ConfigInfo>();
        HashMap<String, ConfigValue> configValueMap = new HashMap<String, ConfigValue>();
        for (ConfigValue configValue : configValues) {
            configName = configValue.name();
            configValueMap.put(configName, configValue);
            if (configKeys.containsKey(configName)) continue;
            configInfoList.add(new ConfigInfo(null, AbstractHerder.convertConfigValue(configValue, null)));
            errorCount += configValue.errorMessages().size();
        }
        for (Map.Entry entry : configKeys.entrySet()) {
            configName = (String)entry.getKey();
            ConfigKeyInfo configKeyInfo = AbstractHerder.convertConfigKey((ConfigDef.ConfigKey)entry.getValue());
            ConfigDef.Type type = ((ConfigDef.ConfigKey)entry.getValue()).type;
            ConfigValueInfo configValueInfo = null;
            if (configValueMap.containsKey(configName)) {
                ConfigValue configValue = (ConfigValue)configValueMap.get(configName);
                configValueInfo = AbstractHerder.convertConfigValue(configValue, type);
                errorCount += configValue.errorMessages().size();
            }
            configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
        }
        return new ConfigInfos(connType, errorCount, groups, configInfoList);
    }

    public static ConfigKeyInfo convertConfigKey(ConfigDef.ConfigKey configKey) {
        return AbstractHerder.convertConfigKey(configKey, "");
    }

    private static ConfigKeyInfo convertConfigKey(ConfigDef.ConfigKey configKey, String prefix) {
        String defaultValue;
        String name = prefix + configKey.name;
        ConfigDef.Type type = configKey.type;
        String typeName = configKey.type.name();
        boolean required = false;
        if (ConfigDef.NO_DEFAULT_VALUE.equals(configKey.defaultValue)) {
            defaultValue = null;
            required = true;
        } else {
            defaultValue = ConfigDef.convertToString((Object)configKey.defaultValue, (ConfigDef.Type)type);
        }
        String importance = configKey.importance.name();
        String documentation = configKey.documentation;
        String group = configKey.group;
        int orderInGroup = configKey.orderInGroup;
        String width = configKey.width.name();
        String displayName = configKey.displayName;
        List dependents = configKey.dependents;
        return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
    }

    private static ConfigValueInfo convertConfigValue(ConfigValue configValue, ConfigDef.Type type) {
        String value = ConfigDef.convertToString((Object)configValue.value(), (ConfigDef.Type)type);
        LinkedList<String> recommendedValues = new LinkedList<String>();
        if (type == ConfigDef.Type.LIST) {
            for (Object object : configValue.recommendedValues()) {
                recommendedValues.add(ConfigDef.convertToString(object, (ConfigDef.Type)ConfigDef.Type.STRING));
            }
        } else {
            for (Object object : configValue.recommendedValues()) {
                recommendedValues.add(ConfigDef.convertToString(object, (ConfigDef.Type)type));
            }
        }
        return new ConfigValueInfo(configValue.name(), value, recommendedValues, configValue.errorMessages(), configValue.visible());
    }

    protected Connector getConnector(String connType) {
        return this.tempConnectors.computeIfAbsent(connType, k -> this.plugins().newConnector((String)k));
    }

    public ConnectorType connectorType(Map<String, String> connConfig) {
        if (connConfig == null) {
            return ConnectorType.UNKNOWN;
        }
        String connClass = connConfig.get("connector.class");
        if (connClass == null) {
            return ConnectorType.UNKNOWN;
        }
        try {
            return ConnectorType.from(this.getConnector(connClass).getClass());
        }
        catch (ConnectException e) {
            log.warn("Unable to retrieve connector type", (Throwable)e);
            return ConnectorType.UNKNOWN;
        }
    }

    protected final boolean maybeAddConfigErrors(ConfigInfos configInfos, Callback<Herder.Created<ConnectorInfo>> callback) {
        boolean hasErrors;
        int errors = configInfos.errorCount();
        boolean bl = hasErrors = errors > 0;
        if (hasErrors) {
            StringBuilder messages = new StringBuilder();
            messages.append("Connector configuration is invalid and contains the following ").append(errors).append(" error(s):");
            for (ConfigInfo configInfo : configInfos.values()) {
                for (String msg : configInfo.configValue().errors()) {
                    messages.append('\n').append(msg);
                }
            }
            callback.onCompletion((Throwable)((Object)new BadRequestException(messages.append("\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`").toString())), null);
        }
        return hasErrors;
    }

    private String trace(Throwable t) {
        ByteArrayOutputStream output = new ByteArrayOutputStream();
        try {
            t.printStackTrace(new PrintStream((OutputStream)output, false, StandardCharsets.UTF_8.name()));
            return output.toString(StandardCharsets.UTF_8.name());
        }
        catch (UnsupportedEncodingException e) {
            return null;
        }
    }

    public static List<Map<String, String>> reverseTransform(String connName, ClusterConfigState configState, List<Map<String, String>> configs) {
        Map<String, String> rawConnConfig = configState.rawConnectorConfig(connName);
        Set<String> connKeysWithVariableValues = AbstractHerder.keysWithVariableValues(rawConnConfig, ConfigTransformer.DEFAULT_PATTERN);
        ArrayList<Map<String, String>> result = new ArrayList<Map<String, String>>();
        for (Map<String, String> config : configs) {
            HashMap<String, String> newConfig = new HashMap<String, String>(config);
            for (String key : connKeysWithVariableValues) {
                if (!newConfig.containsKey(key)) continue;
                newConfig.put(key, rawConnConfig.get(key));
            }
            result.add(newConfig);
        }
        return result;
    }

    public static boolean taskConfigsChanged(ClusterConfigState configState, String connName, List<Map<String, String>> rawTaskProps) {
        Map<String, String> currentConnectorConfig;
        Map<String, String> appliedConnectorConfig;
        int currentNumTasks = configState.taskCount(connName);
        boolean result = false;
        if (rawTaskProps.size() != currentNumTasks) {
            log.debug("Connector {} task count changed from {} to {}", new Object[]{connName, currentNumTasks, rawTaskProps.size()});
            result = true;
        }
        if (!result) {
            for (int index = 0; index < currentNumTasks; ++index) {
                ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                if (rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) continue;
                log.debug("Connector {} has change in configuration for task {}-{}", new Object[]{connName, connName, index});
                result = true;
            }
        }
        if (!result && !Objects.equals(appliedConnectorConfig = configState.appliedConnectorConfig(connName), currentConnectorConfig = configState.connectorConfig(connName))) {
            log.debug("Forcing task restart for connector {} as its configuration appears to be updated", (Object)connName);
            result = true;
        }
        if (result) {
            log.debug("Reconfiguring connector {}: writing new updated configurations for tasks", (Object)connName);
        } else {
            log.debug("Skipping reconfiguration of connector {} as generated configs appear unchanged", (Object)connName);
        }
        return result;
    }

    static Set<String> keysWithVariableValues(Map<String, String> rawConfig, Pattern pattern) {
        HashSet<String> keys = new HashSet<String>();
        for (Map.Entry<String, String> config : rawConfig.entrySet()) {
            Matcher matcher;
            if (config.getValue() == null || !(matcher = pattern.matcher(config.getValue())).find()) continue;
            keys.add(config.getKey());
        }
        return keys;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
        Class<?> pluginClass;
        Plugins p = this.plugins();
        try {
            pluginClass = p.pluginClass(pluginName);
        }
        catch (ClassNotFoundException cnfe) {
            throw new NotFoundException("Unknown plugin " + pluginName + ".");
        }
        try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader());){
            ConfigDef pluginConfigDefs;
            Object plugin = p.newPlugin(pluginName);
            ConfigDef baseConfigDefs = null;
            if (plugin instanceof SinkConnector) {
                baseConfigDefs = SinkConnectorConfig.configDef();
                pluginConfigDefs = ((SinkConnector)plugin).config();
            } else if (plugin instanceof SourceConnector) {
                baseConfigDefs = SourceConnectorConfig.configDef();
                pluginConfigDefs = ((SourceConnector)plugin).config();
            } else if (plugin instanceof Converter) {
                pluginConfigDefs = ((Converter)plugin).config();
            } else if (plugin instanceof HeaderConverter) {
                pluginConfigDefs = ((HeaderConverter)plugin).config();
            } else if (plugin instanceof Transformation) {
                pluginConfigDefs = ((Transformation)plugin).config();
            } else {
                if (!(plugin instanceof Predicate)) throw new BadRequestException("Invalid plugin class " + pluginName + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
                pluginConfigDefs = ((Predicate)plugin).config();
            }
            LinkedHashMap configsMap = new LinkedHashMap(pluginConfigDefs.configKeys());
            if (baseConfigDefs != null) {
                baseConfigDefs.configKeys().forEach(configsMap::putIfAbsent);
            }
            ArrayList<ConfigKeyInfo> results = new ArrayList<ConfigKeyInfo>();
            for (ConfigDef.ConfigKey configKey : configsMap.values()) {
                results.add(AbstractHerder.convertConfigKey(configKey));
            }
            ArrayList<ConfigKeyInfo> arrayList = results;
            return arrayList;
        }
        catch (ClassNotFoundException e) {
            throw new ConnectException("Failed to load plugin class or one of its dependencies", (Throwable)e);
        }
    }

    @Override
    public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
        ClusterConfigState configSnapshot = this.configBackingStore.snapshot();
        try {
            if (!configSnapshot.contains(connName)) {
                cb.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                return;
            }
            this.worker.connectorOffsets(connName, configSnapshot.connectorConfig(connName), cb);
        }
        catch (Throwable t) {
            cb.onCompletion(t, null);
        }
    }

    @Override
    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
        if (offsets == null || offsets.isEmpty()) {
            callback.onCompletion(new ConnectException("The offsets to be altered may not be null or empty"), null);
            return;
        }
        this.modifyConnectorOffsets(connName, offsets, callback);
    }

    @Override
    public void resetConnectorOffsets(String connName, Callback<Message> callback) {
        this.modifyConnectorOffsets(connName, null, callback);
    }

    protected abstract void modifyConnectorOffsets(String var1, Map<Map<String, ?>, Map<String, ?>> var2, Callback<Message> var3);

    @Override
    public LoggerLevel loggerLevel(String logger) {
        return this.loggers.level(logger);
    }

    @Override
    public Map<String, LoggerLevel> allLoggerLevels() {
        return this.loggers.allLevels();
    }

    @Override
    public List<String> setWorkerLoggerLevel(String namespace, String desiredLevelStr) {
        Level level = Level.toLevel((String)desiredLevelStr.toUpperCase(Locale.ROOT), null);
        if (level == null) {
            log.warn("Ignoring request to set invalid level '{}' for namespace {}", (Object)desiredLevelStr, (Object)namespace);
            return Collections.emptyList();
        }
        return this.loggers.setLevel(namespace, level);
    }
}

