/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator;

import io.confluent.connect.replicator.ReplicatorSourceConnector;
import io.confluent.connect.replicator.exec.EmbeddedHerder;
import io.confluent.connect.replicator.exec.ExecutableConfigProvider;
import io.confluent.connect.replicator.exec.ReplicatorCli;
import java.net.URI;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicatorApp {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorApp.class);
    private final ReplicatorCli cli;
    private ExecutableConfigProvider executableConfigProvider = new ExecutableConfigProvider();
    private final Time time;
    private Plugins plugins;
    private DistributedConfig workerConfig;
    private EmbeddedHerder herder;
    private Connect app;
    private RestServer rest;
    private String appInstanceName;

    public ReplicatorApp() {
        this(Time.SYSTEM);
    }

    public ReplicatorApp(Time time) {
        this.time = time;
        this.cli = new ReplicatorCli();
    }

    public ReplicatorApp(Time time, Connect app, RestServer rest, EmbeddedHerder herder) {
        this(time);
        this.app = app;
        this.rest = rest;
        this.herder = herder;
    }

    public static void main(String[] args) {
        ReplicatorApp replicator = new ReplicatorApp();
        replicator.validate(args);
        replicator.registerReplicatorPlugin();
        replicator.config();
        replicator.start();
    }

    public static String name() {
        return "Replicator";
    }

    public void validate(String[] commandLineArgs) {
        log.info("Validating input arguments");
        long start = this.time.hiResClockMs();
        this.executableConfigProvider.setAllProps(this.cli.parse(commandLineArgs));
        log.info("{} input argument validation took {}ms", (Object)ReplicatorApp.name(), (Object)(this.time.hiResClockMs() - start));
    }

    public void registerReplicatorPlugin() {
        log.info("Scanning for {} plugin classes. This might take a moment ...", (Object)ReplicatorApp.name());
        long start = this.time.hiResClockMs();
        log.info("Setting '{}' to '{}' using the supplied configuration.", (Object)"plugin.path", (Object)this.executableConfigProvider.getWorkerConfig().get("plugin.path"));
        this.plugins = new Plugins(this.executableConfigProvider.getWorkerConfig());
        int matches = 0;
        for (PluginDesc pluginDesc : this.plugins.connectors()) {
            matches += pluginDesc.className().equals(ReplicatorSourceConnector.class.getName()) ? 1 : 0;
        }
        if (matches > 1) {
            throw new ConnectException("More than one classes of plugin '" + ReplicatorSourceConnector.class.getName() + "' were found in the plugin.path: " + this.workerConfig.getString("plugin.path") + " and the CLASSPATH");
        }
        log.info("{} plugin scanning took {}ms", (Object)ReplicatorApp.name(), (Object)(this.time.hiResClockMs() - start));
    }

    public void config() {
        long start = this.time.hiResClockMs();
        this.appInstanceName = this.executableConfigProvider.getConnectorConfig().get("name");
        this.workerConfig = new DistributedConfig(this.executableConfigProvider.getWorkerConfig());
        this.rest = new RestServer((WorkerConfig)this.workerConfig);
        this.rest.initializeServer();
        URI advertisedUrl = this.rest.advertisedUrl();
        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
        offsetBackingStore.configure((WorkerConfig)this.workerConfig);
        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = (ConnectorClientConfigOverridePolicy)this.plugins.newPlugin(this.workerConfig.getString("connector.client.config.override.policy"), (AbstractConfig)this.workerConfig, ConnectorClientConfigOverridePolicy.class);
        Worker worker = new Worker(workerId, this.time, this.plugins, (WorkerConfig)this.workerConfig, (OffsetBackingStore)offsetBackingStore, connectorClientConfigOverridePolicy);
        WorkerConfigTransformer configTransformer = worker.configTransformer();
        KafkaStatusBackingStore statusBackingStore = new KafkaStatusBackingStore(this.time, worker.getInternalValueConverter());
        statusBackingStore.configure((WorkerConfig)this.workerConfig);
        KafkaConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), (WorkerConfig)this.workerConfig, configTransformer);
        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId((WorkerConfig)this.workerConfig);
        this.herder = new EmbeddedHerder(this.appInstanceName, this.workerConfig, worker, kafkaClusterId, (StatusBackingStore)statusBackingStore, (ConfigBackingStore)configBackingStore, advertisedUrl.toString(), this.time, connectorClientConfigOverridePolicy);
        this.app = new Connect((Herder)this.herder, this.rest);
        log.info("{} initialization took {}ms", (Object)ReplicatorApp.name(), (Object)(this.time.hiResClockMs() - start));
    }

    public void start() {
        try {
            this.app.start();
        }
        catch (Exception e) {
            log.error("Failed to start {} worker", (Object)ReplicatorApp.name(), (Object)e);
            this.app.stop();
        }
        try {
            FutureCallback cb = new FutureCallback((Callback)new Callback<Herder.Created<ConnectorInfo>>(){

                public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
                    if (error == null) {
                        log.info("Created {}", (Object)((ConnectorInfo)info.result()).name());
                    }
                }
            });
            this.herder.putConnectorConfig(this.appInstanceName, this.executableConfigProvider.getConnectorConfig(), true, (Callback<Herder.Created<ConnectorInfo>>)cb);
            cb.get();
        }
        catch (Throwable t) {
            Throwable cause = t.getCause();
            if (cause instanceof NotLeaderException) {
                log.info("This instance of Replicator is not cluster leader. Will not submit a configuration");
            }
            log.error("Stopping after {} error", (Object)ReplicatorApp.name(), (Object)t);
            this.app.stop();
        }
        this.app.awaitStop();
    }

    public void setExecutableConfigProvider(ExecutableConfigProvider executableConfigProvider) {
        this.executableConfigProvider = executableConfigProvider;
    }

    public void setHerder(EmbeddedHerder herder) {
        this.herder = herder;
    }

    public void setApp(Connect app) {
        this.app = app;
    }
}

