package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/integration/utils/KafkaEmbedded.class */
public class KafkaEmbedded {
    private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
    private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
    private final Properties effectiveConfig;
    private final KafkaServer kafka;
    private final File tmpFolder = TestUtils.tempDirectory();
    private final File logDir = TestUtils.tempDirectory(this.tmpFolder.toPath(), "log");

    public KafkaEmbedded(Properties properties, MockTime mockTime) throws IOException {
        this.effectiveConfig = effectiveConfigFrom(properties);
        KafkaConfig kafkaConfig = new KafkaConfig(this.effectiveConfig, true);
        log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", this.logDir, zookeeperConnect());
        this.kafka = kafka.utils.TestUtils.createServer(kafkaConfig, mockTime);
        log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect());
    }

    private Properties effectiveConfigFrom(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("broker.id", 0);
        properties2.put("num.partitions", 1);
        properties2.put("auto.create.topics.enable", true);
        properties2.put("message.max.bytes", 1000000);
        properties2.put("controlled.shutdown.enable", true);
        properties2.put("zookeeper.session.timeout.ms", 10000);
        properties2.putAll(properties);
        properties2.setProperty("log.dir", this.logDir.getAbsolutePath());
        return properties2;
    }

    public String brokerList() {
        EndPoint endPoint = (EndPoint) this.kafka.advertisedListeners().head();
        return endPoint.host() + ":" + endPoint.port();
    }

    public String zookeeperConnect() {
        return this.effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
    }

    public void stopAsync() {
        log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect());
        this.kafka.shutdown();
    }

    public void awaitStoppedAndPurge() {
        this.kafka.awaitShutdown();
        log.debug("Removing log dir at {} ...", this.logDir);
        try {
            Utils.delete(this.tmpFolder);
            log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void createTopic(String str) {
        createTopic(str, 1, 1, Collections.emptyMap());
    }

    public void createTopic(String str, int i, int i2) {
        createTopic(str, i, i2, Collections.emptyMap());
    }

    public void createTopic(String str, int i, int i2, Map<String, String> map) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), map});
        NewTopic newTopic = new NewTopic(str, i, (short) i2);
        newTopic.configs(map);
        try {
            Admin createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                try {
                    createAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public Admin createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokerList());
        Object obj = this.effectiveConfig.get("listeners");
        if (obj != null && obj.toString().contains("SSL")) {
            properties.put("ssl.truststore.location", this.effectiveConfig.get("ssl.truststore.location"));
            properties.put("ssl.truststore.password", ((Password) this.effectiveConfig.get("ssl.truststore.password")).value());
            properties.put("security.protocol", "SSL");
        }
        return Admin.create(properties);
    }

    public void deleteTopic(String str) {
        log.debug("Deleting topic { name: {} }", str);
        try {
            Admin createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                createAdminClient.deleteTopics(Collections.singletonList(str)).all().get();
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw new RuntimeException(e);
            }
        }
    }

    public KafkaServer kafkaServer() {
        return this.kafka;
    }
}
