/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaServer;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.integration.utils.KafkaEmbedded;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

public class EmbeddedKafkaCluster {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final int DEFAULT_BROKER_PORT = 0;
    private static final int TOPIC_CREATION_TIMEOUT = 30000;
    private static final int TOPIC_DELETION_TIMEOUT = 30000;
    private EmbeddedZookeeper zookeeper = null;
    private final KafkaEmbedded[] brokers;
    private final Properties brokerConfig;
    private final List<Properties> brokerConfigOverrides;
    public final MockTime time;

    public EmbeddedKafkaCluster(int numBrokers) {
        this(numBrokers, new Properties());
    }

    public EmbeddedKafkaCluster(int numBrokers, Properties brokerConfig) {
        this(numBrokers, brokerConfig, System.currentTimeMillis());
    }

    public EmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, long mockTimeMillisStart) {
        this(numBrokers, brokerConfig, Collections.emptyList(), mockTimeMillisStart);
    }

    public EmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, List<Properties> brokerConfigOverrides) {
        this(numBrokers, brokerConfig, brokerConfigOverrides, System.currentTimeMillis());
    }

    public EmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, List<Properties> brokerConfigOverrides, long mockTimeMillisStart) {
        this(numBrokers, brokerConfig, brokerConfigOverrides, mockTimeMillisStart, System.nanoTime());
    }

    public EmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, List<Properties> brokerConfigOverrides, long mockTimeMillisStart, long mockTimeNanoStart) {
        if (!brokerConfigOverrides.isEmpty() && brokerConfigOverrides.size() != numBrokers) {
            throw new IllegalArgumentException("Size of brokerConfigOverrides " + brokerConfigOverrides.size() + " must match broker number " + numBrokers);
        }
        this.brokers = new KafkaEmbedded[numBrokers];
        this.brokerConfig = brokerConfig;
        this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
        this.brokerConfigOverrides = brokerConfigOverrides;
    }

    public void start() throws IOException {
        log.debug("Initiating embedded Kafka cluster startup");
        log.debug("Starting a ZooKeeper instance");
        this.zookeeper = new EmbeddedZookeeper();
        log.debug("ZooKeeper instance is running at {}", (Object)this.zKConnectString());
        this.brokerConfig.put("zookeeper.connect", this.zKConnectString());
        this.putIfAbsent(this.brokerConfig, "listeners", "PLAINTEXT://localhost:0");
        this.putIfAbsent(this.brokerConfig, "delete.topic.enable", true);
        this.putIfAbsent(this.brokerConfig, "log.cleaner.dedupe.buffer.size", 0x200000L);
        this.putIfAbsent(this.brokerConfig, "group.min.session.timeout.ms", 0);
        this.putIfAbsent(this.brokerConfig, "group.initial.rebalance.delay.ms", 0);
        this.putIfAbsent(this.brokerConfig, "offsets.topic.replication.factor", (short)1);
        this.putIfAbsent(this.brokerConfig, "offsets.topic.num.partitions", 5);
        this.putIfAbsent(this.brokerConfig, "transaction.state.log.num.partitions", 5);
        this.putIfAbsent(this.brokerConfig, "auto.create.topics.enable", true);
        for (int i = 0; i < this.brokers.length; ++i) {
            this.brokerConfig.put("broker.id", (Object)i);
            log.debug("Starting a Kafka instance on {} ...", this.brokerConfig.get("listeners"));
            Properties effectiveConfig = new Properties();
            effectiveConfig.putAll((Map<?, ?>)this.brokerConfig);
            if (this.brokerConfigOverrides != null && this.brokerConfigOverrides.size() > i) {
                effectiveConfig.putAll((Map<?, ?>)this.brokerConfigOverrides.get(i));
            }
            this.brokers[i] = new KafkaEmbedded(effectiveConfig, this.time);
            log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", (Object)this.brokers[i].brokerList(), (Object)this.brokers[i].zookeeperConnect());
        }
    }

    private void putIfAbsent(Properties props, String propertyKey, Object propertyValue) {
        if (!props.containsKey(propertyKey)) {
            this.brokerConfig.put(propertyKey, propertyValue);
        }
    }

    public void stop() {
        Set<String> topics;
        if (this.brokers.length > 1 && !(topics = this.getAllTopicsInCluster()).isEmpty()) {
            try (Admin adminClient = this.brokers[0].createAdminClient();){
                adminClient.deleteTopics(topics).all().get();
            }
            catch (InterruptedException e) {
                log.warn("Got interrupted while deleting topics in preparation for stopping embedded brokers", (Throwable)e);
                throw new RuntimeException(e);
            }
            catch (RuntimeException | ExecutionException e) {
                log.warn("Couldn't delete all topics before stopping brokers", (Throwable)e);
            }
        }
        for (KafkaEmbedded broker : this.brokers) {
            broker.stopAsync();
        }
        for (KafkaEmbedded broker : this.brokers) {
            broker.awaitStoppedAndPurge();
        }
        this.zookeeper.shutdown();
    }

    public String zKConnectString() {
        return "127.0.0.1:" + this.zookeeper.port();
    }

    public String bootstrapServers() {
        return this.brokers[0].brokerList();
    }

    public void createTopics(String ... topics) throws InterruptedException {
        for (String topic : topics) {
            this.createTopic(topic, 1, 1, Collections.emptyMap());
        }
    }

    public void createTopic(String topic) throws InterruptedException {
        this.createTopic(topic, 1, 1, Collections.emptyMap());
    }

    public void createTopic(String topic, int partitions, int replication) throws InterruptedException {
        this.createTopic(topic, partitions, replication, Collections.emptyMap());
    }

    public void createTopic(String topic, int partitions, int replication, Map<String, String> topicConfig) throws InterruptedException {
        this.brokers[0].createTopic(topic, partitions, replication, topicConfig);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (int partition = 0; partition < partitions; ++partition) {
            topicPartitions.add(new TopicPartition(topic, partition));
        }
        IntegrationTestUtils.waitForTopicPartitions(this.brokers(), topicPartitions, 30000L);
    }

    public void deleteTopic(String topic) throws InterruptedException {
        this.deleteTopicsAndWait(-1L, topic);
    }

    public void deleteTopicAndWait(String topic) throws InterruptedException {
        this.deleteTopicsAndWait(30000L, topic);
    }

    public void deleteTopics(String ... topics) throws InterruptedException {
        this.deleteTopicsAndWait(-1L, topics);
    }

    public void deleteTopicsAndWait(String ... topics) throws InterruptedException {
        this.deleteTopicsAndWait(30000L, topics);
    }

    public void deleteTopicsAndWait(long timeoutMs, String ... topics) throws InterruptedException {
        for (String topic : topics) {
            try {
                this.brokers[0].deleteTopic(topic);
            }
            catch (UnknownTopicOrPartitionException unknownTopicOrPartitionException) {
                // empty catch block
            }
        }
        if (timeoutMs > 0L) {
            TestUtils.waitForCondition((TestCondition)new TopicsDeletedCondition(topics), (long)timeoutMs, (String)("Topics not deleted after " + timeoutMs + " milli seconds."));
        }
    }

    public void deleteAllTopicsAndWait(long timeoutMs) throws InterruptedException {
        Set<String> topics = this.getAllTopicsInCluster();
        for (String topic : topics) {
            try {
                this.brokers[0].deleteTopic(topic);
            }
            catch (UnknownTopicOrPartitionException unknownTopicOrPartitionException) {}
        }
        if (timeoutMs > 0L) {
            TestUtils.waitForCondition((TestCondition)new TopicsDeletedCondition(topics), (long)timeoutMs, (String)("Topics not deleted after " + timeoutMs + " milli seconds."));
        }
    }

    public void waitForRemainingTopics(long timeoutMs, String ... topics) throws InterruptedException {
        TestUtils.waitForCondition((TestCondition)new TopicsRemainingCondition(topics), (long)timeoutMs, (String)("Topics are not expected after " + timeoutMs + " milli seconds."));
    }

    private List<KafkaServer> brokers() {
        ArrayList<KafkaServer> servers = new ArrayList<KafkaServer>();
        for (KafkaEmbedded broker : this.brokers) {
            servers.add(broker.kafkaServer());
        }
        return servers;
    }

    public Properties getLogConfig(String topic) {
        return this.brokers[0].kafkaServer().zkClient().getEntityConfigs("topics", topic);
    }

    public Set<String> getAllTopicsInCluster() {
        Iterator topicsIterator = this.brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator();
        HashSet<String> topics = new HashSet<String>();
        while (topicsIterator.hasNext()) {
            topics.add((String)topicsIterator.next());
        }
        return topics;
    }

    private final class TopicsRemainingCondition
    implements TestCondition {
        final Set<String> remainingTopics = new HashSet<String>();

        private TopicsRemainingCondition(String ... topics) {
            Collections.addAll(this.remainingTopics, topics);
        }

        public boolean conditionMet() {
            Set<String> allTopics = EmbeddedKafkaCluster.this.getAllTopicsInCluster();
            return allTopics.equals(this.remainingTopics);
        }
    }

    private final class TopicsDeletedCondition
    implements TestCondition {
        final Set<String> deletedTopics = new HashSet<String>();

        private TopicsDeletedCondition(String ... topics) {
            Collections.addAll(this.deletedTopics, topics);
        }

        private TopicsDeletedCondition(Collection<String> topics) {
            this.deletedTopics.addAll(topics);
        }

        public boolean conditionMet() {
            Set<String> allTopics = EmbeddedKafkaCluster.this.getAllTopicsInCluster();
            return !allTopics.removeAll(this.deletedTopics);
        }
    }
}

