package uk.camsw.rxjava.test.kafka.rule;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Properties;
import java.util.concurrent.Executors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:uk/camsw/rxjava/test/kafka/rule/EmbeddedKafkaLauncher.class */
public class EmbeddedKafkaLauncher {
    private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaLauncher.class);

    public static Subscription start(InputStream inputStream, InputStream inputStream2) {
        logger.info("Starting Embedded Kafka/ Zookeeper");
        try {
            Properties properties = new Properties();
            properties.load(inputStream2);
            logger.info("Starting zookeeper: [{}]", properties);
            QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
            quorumPeerConfig.parseProperties(properties);
            ZooKeeperServerMain zooKeeperServerMain = new ZooKeeperServerMain();
            ServerConfig serverConfig = new ServerConfig();
            serverConfig.readFrom(quorumPeerConfig);
            Subscription schedule = Schedulers.from(Executors.newSingleThreadExecutor()).createWorker().schedule(() -> {
                try {
                    zooKeeperServerMain.runFromConfig(serverConfig);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            Properties properties2 = new Properties();
            properties2.load(inputStream);
            logger.info("Starting kafka: [{}]", properties2);
            KafkaConfig kafkaConfig = new KafkaConfig(properties2);
            KafkaServerStartable kafkaServerStartable = new KafkaServerStartable(kafkaConfig);
            kafkaServerStartable.startup();
            Subscription create = Subscriptions.create(() -> {
                kafkaServerStartable.shutdown();
                schedule.unsubscribe();
                logger.info("Stopping embedded kafka instance");
            });
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(create);
            compositeSubscription.add(schedule);
            compositeSubscription.getClass();
            Runtime.getRuntime().addShutdownHook(new Thread(compositeSubscription::unsubscribe));
            logger.info("Waiting for kafka to start");
            Socket socket = new Socket();
            socket.connect(new InetSocketAddress("127.0.0.1", kafkaConfig.port()), 10000);
            socket.close();
            logger.info("Kafka started");
            logger.info("Started kafka/ zookeeper");
            return compositeSubscription;
        } catch (IOException | QuorumPeerConfig.ConfigException e) {
            logger.error("Failed to start kafka/ zookeeper", e);
            throw new RuntimeException(e);
        }
    }
}
