/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.cli;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.cli.AbstractConnectCli;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.SharedTopicAdmin;

public class ConnectDistributed
extends AbstractConnectCli<DistributedConfig> {
    public ConnectDistributed(String ... args) {
        super(args);
    }

    @Override
    protected String usage() {
        return "ConnectDistributed worker.properties";
    }

    @Override
    protected Herder createHerder(DistributedConfig config, String workerId, Plugins plugins, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, RestServer restServer, RestClient restClient) {
        String kafkaClusterId = config.kafkaClusterId();
        String clientIdBase = ConnectUtils.clientIdBase(config);
        HashMap<String, Object> adminProps = new HashMap<String, Object>(config.originals());
        ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
        adminProps.put("client.id", clientIdBase + "shared-admin");
        SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase, plugins.newInternalConverter(true, JsonConverter.class.getName(), Collections.singletonMap("schemas.enable", "false")));
        offsetBackingStore.configure(config);
        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
        WorkerConfigTransformer configTransformer = worker.configTransformer();
        Converter internalValueConverter = worker.getInternalValueConverter();
        KafkaStatusBackingStore statusBackingStore = new KafkaStatusBackingStore(Time.SYSTEM, internalValueConverter, sharedAdmin, clientIdBase);
        statusBackingStore.configure(config);
        KafkaConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config, configTransformer, sharedAdmin, clientIdBase);
        return new DistributedHerder(config, Time.SYSTEM, worker, kafkaClusterId, (StatusBackingStore)statusBackingStore, (ConfigBackingStore)configBackingStore, restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy, Collections.emptyList(), sharedAdmin);
    }

    @Override
    protected DistributedConfig createConfig(Map<String, String> workerProps) {
        return new DistributedConfig(workerProps);
    }

    public static void main(String[] args) {
        ConnectDistributed connectDistributed = new ConnectDistributed(args);
        connectDistributed.run();
    }
}

