package org.springframework.kafka.test;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.ZkFourLetterWords;
import kafka.zookeeper.ZooKeeperClient;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import scala.Option;

/* loaded from: input_file:org/springframework/kafka/test/EmbeddedKafkaBroker.class */
public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
    private static final String BROKER_NEEDED = "Broker must be started before this method can be called";
    private static final String LOOPBACK = "127.0.0.1";
    private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(EmbeddedKafkaBroker.class));
    public static final String BEAN_NAME = "embeddedKafka";
    public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
    public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
    public static final int DEFAULT_ADMIN_TIMEOUT = 10;
    public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
    public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = 18000;
    private static final Method GET_BROKER_STATE_METHOD;
    private static final Method BROKER_CONFIGS_METHOD;
    private final int count;
    private final boolean controlledShutdown;
    private final Set<String> topics;
    private final int partitionsPerTopic;
    private final List<KafkaServer> kafkaServers;
    private final Map<String, Object> brokerProperties;
    private EmbeddedZookeeper zookeeper;
    private String zkConnect;
    private int zkPort;
    private int[] kafkaPorts;
    private Duration adminTimeout;
    private int zkConnectionTimeout;
    private int zkSessionTimeout;
    private String brokerListProperty;
    private volatile ZooKeeperClient zooKeeperClient;

    /* loaded from: input_file:org/springframework/kafka/test/EmbeddedKafkaBroker$EmbeddedZookeeper.class */
    public static final class EmbeddedZookeeper {
        private static final int THREE_K = 3000;
        private static final int HUNDRED = 100;
        private static final int TICK_TIME = 800;
        private final NIOServerCnxnFactory factory;
        private final ZooKeeperServer zookeeper;
        private final int port;
        private final File snapshotDir = TestUtils.tempDir();
        private final File logDir = TestUtils.tempDir();

        public EmbeddedZookeeper(int i) throws IOException, InterruptedException {
            System.setProperty("zookeeper.forceSync", "no");
            this.zookeeper = new ZooKeeperServer(this.snapshotDir, this.logDir, TICK_TIME);
            this.factory = new NIOServerCnxnFactory();
            this.factory.configure(new InetSocketAddress(EmbeddedKafkaBroker.LOOPBACK, i == 0 ? TestUtils.RandomPort() : i), 0);
            this.factory.startup(this.zookeeper);
            this.port = this.zookeeper.getClientPort();
        }

        public int getPort() {
            return this.port;
        }

        public File getSnapshotDir() {
            return this.snapshotDir;
        }

        public File getLogDir() {
            return this.logDir;
        }

        public void shutdown() throws IOException {
            try {
                this.factory.shutdown();
            } catch (Exception e) {
                EmbeddedKafkaBroker.logger.error(e, "ZK shutdown failed");
            }
            int i = 0;
            while (true) {
                int i2 = i;
                i++;
                if (i2 >= HUNDRED) {
                    break;
                }
                try {
                    ZkFourLetterWords.sendStat(EmbeddedKafkaBroker.LOOPBACK, this.port, THREE_K);
                    Thread.sleep(100L);
                } catch (Exception e2) {
                }
            }
            if (i == HUNDRED) {
                EmbeddedKafkaBroker.logger.debug("Zookeeper failed to stop");
            }
            try {
                this.zookeeper.getZKDatabase().close();
            } catch (Exception e3) {
                EmbeddedKafkaBroker.logger.error(e3, "ZK db close failed");
            }
            Utils.delete(this.logDir);
            Utils.delete(this.snapshotDir);
        }
    }

    public EmbeddedKafkaBroker(int i) {
        this(i, false, new String[0]);
    }

    public EmbeddedKafkaBroker(int i, boolean z, String... strArr) {
        this(i, z, 2, strArr);
    }

    public EmbeddedKafkaBroker(int i, boolean z, int i2, String... strArr) {
        this.kafkaServers = new ArrayList();
        this.brokerProperties = new HashMap();
        this.adminTimeout = Duration.ofSeconds(10L);
        this.zkConnectionTimeout = 18000;
        this.zkSessionTimeout = 18000;
        this.count = i;
        this.kafkaPorts = new int[this.count];
        this.controlledShutdown = z;
        if (strArr != null) {
            this.topics = new HashSet(Arrays.asList(strArr));
        } else {
            this.topics = new HashSet();
        }
        this.partitionsPerTopic = i2;
    }

    public EmbeddedKafkaBroker brokerProperties(Map<String, String> map) {
        this.brokerProperties.putAll(map);
        return this;
    }

    public EmbeddedKafkaBroker brokerProperty(String str, Object obj) {
        this.brokerProperties.put(str, obj);
        return this;
    }

    public EmbeddedKafkaBroker kafkaPorts(int... iArr) {
        Assert.isTrue(iArr.length == this.count, "A port must be provided for each instance [" + this.count + "], provided: " + Arrays.toString(iArr) + ", use 0 for a random port");
        this.kafkaPorts = Arrays.copyOf(iArr, iArr.length);
        return this;
    }

    public EmbeddedKafkaBroker brokerListProperty(String str) {
        this.brokerListProperty = str;
        return this;
    }

    public EmbeddedKafkaBroker zkPort(int i) {
        this.zkPort = i;
        return this;
    }

    public int getZkPort() {
        return this.zookeeper != null ? this.zookeeper.getPort() : this.zkPort;
    }

    public void setZkPort(int i) {
        this.zkPort = i;
    }

    public EmbeddedKafkaBroker adminTimeout(int i) {
        this.adminTimeout = Duration.ofSeconds(i);
        return this;
    }

    public void setAdminTimeout(int i) {
        this.adminTimeout = Duration.ofSeconds(i);
    }

    public synchronized EmbeddedKafkaBroker zkConnectionTimeout(int i) {
        this.zkConnectionTimeout = i;
        return this;
    }

    public synchronized EmbeddedKafkaBroker zkSessionTimeout(int i) {
        this.zkSessionTimeout = i;
        return this;
    }

    public void afterPropertiesSet() {
        overrideExitMethods();
        try {
            this.zookeeper = new EmbeddedZookeeper(this.zkPort);
            this.zkConnect = "127.0.0.1:" + this.zookeeper.getPort();
            this.kafkaServers.clear();
            boolean z = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
            for (int i = 0; i < this.count; i++) {
                Properties createBrokerProperties = createBrokerProperties(i);
                createBrokerProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
                createBrokerProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
                createBrokerProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
                createBrokerProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
                Map<String, Object> map = this.brokerProperties;
                Objects.requireNonNull(createBrokerProperties);
                map.forEach((v1, v2) -> {
                    r1.put(v1, v2);
                });
                if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
                    createBrokerProperties.setProperty(KafkaConfig.NumPartitionsProp(), this.partitionsPerTopic);
                }
                if (!z) {
                    logDir(createBrokerProperties);
                }
                KafkaServer createServer = TestUtils.createServer(new KafkaConfig(createBrokerProperties), Time.SYSTEM);
                this.kafkaServers.add(createServer);
                if (this.kafkaPorts[i] == 0) {
                    this.kafkaPorts[i] = TestUtils.boundPort(createServer, SecurityProtocol.PLAINTEXT);
                }
            }
            createKafkaTopics(this.topics);
            if (this.brokerListProperty == null) {
                this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
            }
            if (this.brokerListProperty == null) {
                this.brokerListProperty = SPRING_EMBEDDED_KAFKA_BROKERS;
            }
            System.setProperty(this.brokerListProperty, getBrokersAsString());
            System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
        } catch (IOException | InterruptedException e) {
            throw new IllegalStateException("Failed to create embedded Zookeeper", e);
        }
    }

    private void logDir(Properties properties) {
        try {
            properties.put(KafkaConfig.LogDirProp(), Files.createTempDirectory("spring.kafka." + UUID.randomUUID(), new FileAttribute[0]).toString());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void overrideExitMethods() {
        String str = "Exit.%s(%d, %s) called";
        Exit.setExitProcedure((i, str2) -> {
            if (logger.isDebugEnabled()) {
                logger.debug(new RuntimeException(), String.format(str, "exit", Integer.valueOf(i), str2));
            } else {
                logger.warn(String.format(str, "exit", Integer.valueOf(i), str2));
            }
        });
        Exit.setHaltProcedure((i2, str3) -> {
            if (logger.isDebugEnabled()) {
                logger.debug(new RuntimeException(), String.format(str, "halt", Integer.valueOf(i2), str3));
            } else {
                logger.warn(String.format(str, "halt", Integer.valueOf(i2), str3));
            }
        });
    }

    private Properties createBrokerProperties(int i) {
        try {
            return BROKER_CONFIGS_METHOD.getParameterTypes().length == 21 ? (Properties) BROKER_CONFIGS_METHOD.invoke(null, Integer.valueOf(i), this.zkConnect, Boolean.valueOf(this.controlledShutdown), true, Integer.valueOf(this.kafkaPorts[i]), Option.apply((Object) null), Option.apply((Object) null), Option.apply((Object) null), true, false, 0, false, 0, false, 0, Option.apply((Object) null), 1, false, Integer.valueOf(this.partitionsPerTopic), Short.valueOf((short) this.count), false) : (Properties) BROKER_CONFIGS_METHOD.invoke(null, Integer.valueOf(i), this.zkConnect, Boolean.valueOf(this.controlledShutdown), true, Integer.valueOf(this.kafkaPorts[i]), Option.apply((Object) null), Option.apply((Object) null), Option.apply((Object) null), true, false, 0, false, 0, false, 0, Option.apply((Object) null), 1, false, Integer.valueOf(this.partitionsPerTopic), Short.valueOf((short) this.count));
        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
            throw new IllegalStateException(e);
        }
    }

    public void addTopics(String... strArr) {
        Assert.notNull(this.zookeeper, BROKER_NEEDED);
        HashSet hashSet = new HashSet(Arrays.asList(strArr));
        createKafkaTopics(hashSet);
        this.topics.addAll(hashSet);
    }

    public void addTopics(NewTopic... newTopicArr) {
        Assert.notNull(this.zookeeper, BROKER_NEEDED);
        for (NewTopic newTopic : newTopicArr) {
            Assert.isTrue(this.topics.add(newTopic.name()), () -> {
                return "topic already exists: " + newTopic;
            });
            Assert.isTrue(newTopic.replicationFactor() <= this.count && (newTopic.replicasAssignments() == null || newTopic.replicasAssignments().size() <= this.count), () -> {
                return "Embedded kafka does not support the requested replication factor: " + newTopic;
            });
        }
        doWithAdmin(adminClient -> {
            createTopics(adminClient, Arrays.asList(newTopicArr));
        });
    }

    private void createKafkaTopics(Set<String> set) {
        doWithAdmin(adminClient -> {
            createTopics(adminClient, (List) set.stream().map(str -> {
                return new NewTopic(str, this.partitionsPerTopic, (short) this.count);
            }).collect(Collectors.toList()));
        });
    }

    private void createTopics(AdminClient adminClient, List<NewTopic> list) {
        try {
            adminClient.createTopics(list).all().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new KafkaException(e);
        }
    }

    public Map<String, Exception> addTopicsWithResults(String... strArr) {
        Assert.notNull(this.zookeeper, BROKER_NEEDED);
        HashSet hashSet = new HashSet(Arrays.asList(strArr));
        this.topics.addAll(hashSet);
        return createKafkaTopicsWithResults(hashSet);
    }

    public Map<String, Exception> addTopicsWithResults(NewTopic... newTopicArr) {
        Assert.notNull(this.zookeeper, BROKER_NEEDED);
        for (NewTopic newTopic : newTopicArr) {
            Assert.isTrue(this.topics.add(newTopic.name()), () -> {
                return "topic already exists: " + newTopic;
            });
            Assert.isTrue(newTopic.replicationFactor() <= this.count && (newTopic.replicasAssignments() == null || newTopic.replicasAssignments().size() <= this.count), () -> {
                return "Embedded kafka does not support the requested replication factor: " + newTopic;
            });
        }
        return (Map) doWithAdminFunction(adminClient -> {
            return createTopicsWithResults(adminClient, Arrays.asList(newTopicArr));
        });
    }

    private Map<String, Exception> createKafkaTopicsWithResults(Set<String> set) {
        return (Map) doWithAdminFunction(adminClient -> {
            return createTopicsWithResults(adminClient, (List) set.stream().map(str -> {
                return new NewTopic(str, this.partitionsPerTopic, (short) this.count);
            }).collect(Collectors.toList()));
        });
    }

    private Map<String, Exception> createTopicsWithResults(AdminClient adminClient, List<NewTopic> list) {
        CreateTopicsResult createTopics = adminClient.createTopics(list);
        HashMap hashMap = new HashMap();
        createTopics.values().entrySet().stream().map(entry -> {
            Object obj;
            try {
                ((KafkaFuture) entry.getValue()).get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
                obj = null;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                obj = e;
            }
            return new AbstractMap.SimpleEntry((String) entry.getKey(), obj);
        }).forEach(simpleEntry -> {
            hashMap.put((String) simpleEntry.getKey(), (Exception) simpleEntry.getValue());
        });
        return hashMap;
    }

    public void doWithAdmin(Consumer<AdminClient> consumer) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", getBrokersAsString());
        AdminClient create = AdminClient.create(hashMap);
        try {
            consumer.accept(create);
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public <T> T doWithAdminFunction(Function<AdminClient, T> function) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", getBrokersAsString());
        AdminClient create = AdminClient.create(hashMap);
        try {
            T apply = function.apply(create);
            if (create != null) {
                create.close();
            }
            return apply;
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void destroy() {
        System.getProperties().remove(this.brokerListProperty);
        System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
        for (KafkaServer kafkaServer : this.kafkaServers) {
            try {
                if (brokerRunning(kafkaServer)) {
                    kafkaServer.shutdown();
                    kafkaServer.awaitShutdown();
                }
            } catch (Exception e) {
            }
            try {
                CoreUtils.delete(kafkaServer.config().logDirs());
            } catch (Exception e2) {
            }
        }
        synchronized (this) {
            if (this.zooKeeperClient != null) {
                this.zooKeeperClient.close();
            }
        }
        try {
            this.zookeeper.shutdown();
            this.zkConnect = null;
        } catch (Exception e3) {
        }
    }

    private boolean brokerRunning(KafkaServer kafkaServer) {
        try {
            return !kafkaServer.brokerState().equals(BrokerState.NOT_RUNNING);
        } catch (NoSuchMethodError e) {
            if (GET_BROKER_STATE_METHOD == null) {
                logger.debug("Could not determine broker state during shutdown");
                return true;
            }
            try {
                return !GET_BROKER_STATE_METHOD.invoke(kafkaServer, new Object[0]).toString().equals("NOT_RUNNING");
            } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e2) {
                logger.debug(e2, "Could not determine broker state during shutdown");
                return true;
            }
        }
    }

    public Set<String> getTopics() {
        return new HashSet(this.topics);
    }

    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public KafkaServer getKafkaServer(int i) {
        return this.kafkaServers.get(i);
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    public synchronized ZooKeeperClient getZooKeeperClient() {
        if (this.zooKeeperClient == null) {
            this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, this.zkSessionTimeout, this.zkConnectionTimeout, 1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK", new ZKClientConfig(), "embeddedKafkaZK");
        }
        return this.zooKeeperClient;
    }

    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public BrokerAddress getBrokerAddress(int i) {
        return new BrokerAddress(LOOPBACK, ((EndPoint) this.kafkaServers.get(i).config().listeners().apply(0)).port());
    }

    public BrokerAddress[] getBrokerAddresses() {
        ArrayList arrayList = new ArrayList();
        for (int i : this.kafkaPorts) {
            arrayList.add(new BrokerAddress(LOOPBACK, i));
        }
        return (BrokerAddress[]) arrayList.toArray(new BrokerAddress[0]);
    }

    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    public void bounce(BrokerAddress brokerAddress) {
        for (KafkaServer kafkaServer : getKafkaServers()) {
            EndPoint endPoint = (EndPoint) kafkaServer.config().listeners().apply(0);
            if (brokerAddress.equals(new BrokerAddress(endPoint.host(), endPoint.port()))) {
                kafkaServer.shutdown();
                kafkaServer.awaitShutdown();
            }
        }
    }

    public void restart(int i) throws Exception {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(10, Collections.singletonMap(Exception.class, true));
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(100L);
        exponentialBackOffPolicy.setMaxInterval(1000L);
        exponentialBackOffPolicy.setMultiplier(2.0d);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(simpleRetryPolicy);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        retryTemplate.execute(retryContext -> {
            this.kafkaServers.get(i).startup();
            return null;
        });
    }

    public String getBrokersAsString() {
        StringBuilder sb = new StringBuilder();
        for (BrokerAddress brokerAddress : getBrokerAddresses()) {
            sb.append(brokerAddress.toString()).append(',');
        }
        return sb.substring(0, sb.length() - 1);
    }

    public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) {
        consumeFromEmbeddedTopics(consumer, (String[]) this.topics.toArray(new String[0]));
    }

    public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean z) {
        consumeFromEmbeddedTopics(consumer, z, (String[]) this.topics.toArray(new String[0]));
    }

    public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String str) {
        consumeFromEmbeddedTopics(consumer, str);
    }

    public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean z, String str) {
        consumeFromEmbeddedTopics(consumer, z, str);
    }

    public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String... strArr) {
        consumeFromEmbeddedTopics(consumer, false, strArr);
    }

    public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean z, String... strArr) {
        List list = (List) Arrays.stream(strArr).filter(str -> {
            return !this.topics.contains(str);
        }).collect(Collectors.toList());
        if (list.size() > 0) {
            throw new IllegalStateException("topic(s):'" + list + "' are not in embedded topic list");
        }
        final AtomicReference atomicReference = new AtomicReference();
        consumer.subscribe(Arrays.asList(strArr), new ConsumerRebalanceListener() { // from class: org.springframework.kafka.test.EmbeddedKafkaBroker.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                atomicReference.set(collection);
                EmbeddedKafkaBroker.logger.debug(() -> {
                    return "partitions assigned: " + collection;
                });
            }
        });
        int i = 0;
        while (atomicReference.get() == null) {
            int i2 = i;
            i++;
            if (i2 >= 600) {
                break;
            } else {
                consumer.poll(Duration.ofMillis(100L));
            }
        }
        if (atomicReference.get() == null) {
            throw new IllegalStateException("Failed to be assigned partitions from the embedded topics");
        }
        logger.debug(() -> {
            return "Partitions assigned " + atomicReference.get() + "; re-seeking to " + (z ? "end; " : "beginning");
        });
        if (z) {
            consumer.seekToEnd((Collection) atomicReference.get());
        } else {
            consumer.seekToBeginning((Collection) atomicReference.get());
        }
        logger.debug("Subscription Initiated");
    }

    static {
        try {
            Method declaredMethod = KafkaServer.class.getDeclaredMethod("brokerState", new Class[0]);
            if (declaredMethod.getReturnType().equals(AtomicReference.class)) {
                GET_BROKER_STATE_METHOD = declaredMethod;
            } else {
                GET_BROKER_STATE_METHOD = null;
            }
            try {
                AtomicReference atomicReference = new AtomicReference();
                ReflectionUtils.doWithMethods(TestUtils.class, method -> {
                    atomicReference.set(method);
                }, method2 -> {
                    return method2.getName().equals("createBrokerConfig");
                });
                BROKER_CONFIGS_METHOD = (Method) atomicReference.get();
                if (BROKER_CONFIGS_METHOD == null) {
                    throw new IllegalStateException();
                }
            } catch (IllegalStateException e) {
                throw new IllegalStateException("Failed to obtain TestUtils.createBrokerConfig method; client version: " + AppInfoParser.getVersion(), e);
            }
        } catch (NoSuchMethodException | SecurityException e2) {
            throw new IllegalStateException("Failed to determine KafkaServer.brokerState() method; client version: " + AppInfoParser.getVersion(), e2);
        }
    }
}
