package io.confluent.kafka.schemaregistry.leaderelector.kafka;

import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryProtocol;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.LeaderElector;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/leaderelector/kafka/KafkaGroupLeaderElector.class */
public class KafkaGroupLeaderElector implements LeaderElector, SchemaRegistryRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(KafkaGroupLeaderElector.class);
    private static final AtomicInteger SR_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.schema.registry";
    private final int initTimeout;
    private final String clientId;
    private final ConsumerNetworkClient client;
    private final Metrics metrics;
    private final Metadata metadata;
    private final long retryBackoffMs;
    private final SchemaRegistryCoordinator coordinator;
    private final KafkaSchemaRegistry schemaRegistry;
    private ExecutorService executor;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private CountDownLatch joinedLatch = new CountDownLatch(1);

    public KafkaGroupLeaderElector(SchemaRegistryConfig schemaRegistryConfig, SchemaRegistryIdentity schemaRegistryIdentity, KafkaSchemaRegistry kafkaSchemaRegistry) throws SchemaRegistryInitializationException {
        try {
            this.schemaRegistry = kafkaSchemaRegistry;
            this.clientId = "sr-" + SR_CLIENT_ID_SEQUENCE.getAndIncrement();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("client-id", this.clientId);
            MetricConfig tags = new MetricConfig().samples(schemaRegistryConfig.getInt("metrics.num.samples").intValue()).timeWindow(schemaRegistryConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).tags(linkedHashMap);
            List configuredInstances = schemaRegistryConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class);
            configuredInstances.add(new JmxReporter());
            MetricsContext metricsContext = kafkaSchemaRegistry.getMetricsContainer().getMetricsContext();
            Time time = Time.SYSTEM;
            ClientConfig clientConfig = new ClientConfig(schemaRegistryConfig.originalsWithPrefix("kafkastore."), false);
            this.metrics = new Metrics(tags, configuredInstances, time, metricsContext);
            this.retryBackoffMs = clientConfig.getLong("retry.backoff.ms").longValue();
            String string = schemaRegistryConfig.getString(SchemaRegistryConfig.SCHEMAREGISTRY_GROUP_ID_CONFIG);
            LogContext logContext = new LogContext("[Schema registry clientId=" + this.clientId + ", groupId=" + string + "] ");
            this.metadata = new Metadata(this.retryBackoffMs, clientConfig.getLong("metadata.max.age.ms").longValue(), logContext, new ClusterResourceListeners());
            this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(schemaRegistryConfig.getList(SchemaRegistryConfig.KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG), clientConfig.getString("client.dns.lookup")));
            this.client = new ConsumerNetworkClient(logContext, new NetworkClient(new Selector(clientConfig.getLong("connections.max.idle.ms").longValue(), this.metrics, time, "kafka.schema.registry", ClientUtils.createChannelBuilder(clientConfig, time, logContext), logContext), this.metadata, this.clientId, 100, clientConfig.getLong("reconnect.backoff.ms").longValue(), clientConfig.getLong("reconnect.backoff.max.ms").longValue(), clientConfig.getInt("send.buffer.bytes").intValue(), clientConfig.getInt("receive.buffer.bytes").intValue(), clientConfig.getInt("request.timeout.ms").intValue(), 10000L, 127000L, ClientDnsLookup.forConfig(clientConfig.getString("client.dns.lookup")), time, true, new ApiVersions(), logContext), this.metadata, time, this.retryBackoffMs, clientConfig.getInt("request.timeout.ms").intValue(), KafkaSchemaRegistry.MAX_VERSION);
            this.coordinator = new SchemaRegistryCoordinator(logContext, this.client, string, 300000, 10000, 3000, this.metrics, "kafka.schema.registry", time, this.retryBackoffMs, schemaRegistryIdentity, this, kafkaSchemaRegistry.getMetricsContainer().getNodeCountMetric());
            AppInfoParser.registerAppInfo("kafka.schema.registry", this.clientId, this.metrics, time.milliseconds());
            this.initTimeout = schemaRegistryConfig.getInt(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG).intValue();
            log.debug("Schema registry group member created");
        } catch (Throwable th) {
            stop(true);
            throw new SchemaRegistryInitializationException("Failed to construct kafka consumer", th);
        }
    }

    @Override // io.confluent.kafka.schemaregistry.storage.LeaderElector
    public void init() throws SchemaRegistryTimeoutException, SchemaRegistryStoreException {
        log.debug("Initializing schema registry group member");
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(new Runnable() { // from class: io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector.1
            @Override // java.lang.Runnable
            public void run() {
                while (!KafkaGroupLeaderElector.this.stopped.get()) {
                    try {
                        KafkaGroupLeaderElector.this.coordinator.poll(2147483647L);
                    } catch (WakeupException e) {
                        return;
                    } catch (Throwable th) {
                        KafkaGroupLeaderElector.log.error("Unexpected exception in schema registry group processing thread", th);
                        return;
                    }
                }
            }
        });
        try {
            if (!this.joinedLatch.await(this.initTimeout, TimeUnit.MILLISECONDS)) {
                throw new SchemaRegistryTimeoutException("Timed out waiting for join group to complete");
            }
            log.debug("Schema registry group member initialized and joined group");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SchemaRegistryStoreException("Interrupted while waiting for join group to complete", e);
        }
    }

    @Override // io.confluent.kafka.schemaregistry.storage.LeaderElector
    public void close() {
        if (this.stopped.get()) {
            return;
        }
        stop(false);
    }

    @Override // io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryRebalanceListener
    public void onAssigned(SchemaRegistryProtocol.Assignment assignment, int i) {
        log.info("Finished rebalance with leader election result: {}", assignment);
        try {
            switch (assignment.error()) {
                case 0:
                    if (assignment.leaderIdentity() == null) {
                        log.error("No leader eligible schema registry instances joined the schema registry group. Rebalancing was successful and this instance can serve reads, but no writes can be processed.");
                    }
                    this.schemaRegistry.setLeader(assignment.leaderIdentity());
                    this.joinedLatch.countDown();
                    return;
                case 1:
                    throw new IllegalStateException("The schema registry group contained multiple members advertising the same URL. Verify that each instance has a unique, routable listener by setting the 'listeners' configuration. This error may happen if executing in containers where the default hostname is 'localhost'.");
                default:
                    throw new IllegalStateException("Unknown error returned from schema registry coordination protocol");
            }
        } catch (SchemaRegistryException e) {
            log.error("Error when updating leader, we will not be able to forward requests to the leader", e);
        }
    }

    @Override // io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryRebalanceListener
    public void onRevoked() {
        log.info("Rebalance started");
        try {
            this.schemaRegistry.setLeader(null);
        } catch (SchemaRegistryException e) {
            log.error("Error when updating leader, we will not be able to forward requests to the leader", e);
        }
    }

    private void stop(boolean z) {
        log.trace("Stopping the schema registry group member.");
        if (this.client != null) {
            this.client.wakeup();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted waiting for schema registry group processing thread to exit", e);
            }
        }
        AtomicReference atomicReference = new AtomicReference();
        this.stopped.set(true);
        closeQuietly(this.coordinator, "coordinator", atomicReference);
        closeQuietly(this.metrics, "consumer metrics", atomicReference);
        closeQuietly(this.client, "consumer network client", atomicReference);
        AppInfoParser.unregisterAppInfo("kafka.schema.registry", this.clientId, this.metrics);
        if (atomicReference.get() != null && !z) {
            throw new KafkaException("Failed to stop the schema registry group member", (Throwable) atomicReference.get());
        }
        log.debug("The schema registry group member has stopped.");
    }

    private static void closeQuietly(AutoCloseable autoCloseable, String str, AtomicReference<Throwable> atomicReference) {
        if (autoCloseable != null) {
            try {
                autoCloseable.close();
            } catch (Throwable th) {
                atomicReference.compareAndSet(null, th);
                log.error("Failed to close {} with type {}", new Object[]{str, autoCloseable.getClass().getName(), th});
            }
        }
    }
}
