package org.apache.camel.component.kafka;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.health.HealthCheckAware;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.resume.ConsumerListenerAware;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer.class */
public class KafkaConsumer extends DefaultConsumer implements ResumeAware<ResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>, Suspendable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private KafkaConsumerHealthCheck consumerHealthCheck;
    private WritableHealthCheckRepository healthCheckRepository;
    private final List<KafkaFetchRecords> tasks;
    private volatile boolean stopOffsetRepo;
    private ResumeStrategy resumeStrategy;
    private KafkaConsumerListener consumerListener;

    public KafkaConsumer(KafkaEndpoint kafkaEndpoint, Processor processor) {
        super(kafkaEndpoint, processor);
        this.tasks = new ArrayList();
        this.endpoint = kafkaEndpoint;
    }

    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
        this.resumeStrategy = resumeStrategy;
    }

    public ResumeStrategy getResumeStrategy() {
        return this.resumeStrategy;
    }

    /* renamed from: getConsumerListener, reason: merged with bridge method [inline-methods] */
    public KafkaConsumerListener m3getConsumerListener() {
        return this.consumerListener;
    }

    public void setConsumerListener(KafkaConsumerListener kafkaConsumerListener) {
        this.consumerListener = kafkaConsumerListener;
    }

    protected void doBuild() throws Exception {
        super.doBuild();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public KafkaEndpoint m2getEndpoint() {
        return super.getEndpoint();
    }

    private String randomUUID() {
        return UUID.randomUUID().toString();
    }

    Properties getProps() {
        KafkaConfiguration configuration = this.endpoint.getConfiguration();
        Properties createConsumerProperties = configuration.createConsumerProperties();
        this.endpoint.updateClassProperties(createConsumerProperties);
        ObjectHelper.ifNotEmpty(this.endpoint.getKafkaClientFactory().getBrokers(configuration), str -> {
            createConsumerProperties.put("bootstrap.servers", str);
        });
        createConsumerProperties.put("group.id", (String) ObjectHelper.supplyIfEmpty(configuration.getGroupId(), this::randomUUID));
        ObjectHelper.ifNotEmpty(configuration.getGroupInstanceId(), str2 -> {
            createConsumerProperties.put("group.instance.id", str2);
        });
        return createConsumerProperties;
    }

    protected void doStart() throws Exception {
        LOG.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", this.endpoint.getConfiguration().getTopic(), Boolean.valueOf(this.endpoint.getConfiguration().isBreakOnFirstError()));
        super.doStart();
        this.healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(this.endpoint.getCamelContext(), "components", WritableHealthCheckRepository.class);
        if (this.healthCheckRepository != null) {
            this.consumerHealthCheck = new KafkaConsumerHealthCheck(this, getRouteId());
            this.consumerHealthCheck.setEnabled(m2getEndpoint().m5getComponent().isHealthCheckConsumerEnabled());
            this.healthCheckRepository.addHealthCheck(this.consumerHealthCheck);
        }
        ServiceSupport offsetRepository = this.endpoint.getConfiguration().getOffsetRepository();
        if ((offsetRepository instanceof ServiceSupport) && !offsetRepository.isStarted()) {
            this.stopOffsetRepo = true;
            LOG.debug("Starting OffsetRepository: {}", offsetRepository);
            ServiceHelper.startService(this.endpoint.getConfiguration().getOffsetRepository());
        }
        this.executor = this.endpoint.createExecutor();
        String topic = this.endpoint.getConfiguration().getTopic();
        Pattern compile = this.endpoint.getConfiguration().isTopicIsPattern() ? Pattern.compile(topic) : null;
        BridgeExceptionHandlerToErrorHandler bridgeExceptionHandlerToErrorHandler = new BridgeExceptionHandlerToErrorHandler(this);
        for (int i = 0; i < this.endpoint.getConfiguration().getConsumersCount(); i++) {
            KafkaFetchRecords kafkaFetchRecords = new KafkaFetchRecords(this, bridgeExceptionHandlerToErrorHandler, topic, compile, Integer.toString(i), getProps(), this.consumerListener);
            this.executor.submit(kafkaFetchRecords);
            this.tasks.add(kafkaFetchRecords);
        }
    }

    protected void doStop() throws Exception {
        if (this.endpoint.getConfiguration().isTopicIsPattern()) {
            LOG.info("Stopping Kafka consumer on topic pattern: {}", this.endpoint.getConfiguration().getTopic());
        } else {
            LOG.info("Stopping Kafka consumer on topic: {}", this.endpoint.getConfiguration().getTopic());
        }
        if (this.healthCheckRepository != null && this.consumerHealthCheck != null) {
            this.healthCheckRepository.removeHealthCheck(this.consumerHealthCheck);
            this.consumerHealthCheck = null;
        }
        if (this.executor != null) {
            if (m2getEndpoint() == null || m2getEndpoint().getCamelContext() == null) {
                this.executor.shutdown();
                int shutdownTimeout = this.endpoint.getConfiguration().getShutdownTimeout();
                LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", Integer.valueOf(shutdownTimeout));
                if (!this.executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
                    LOG.warn("Shutting down Kafka {} consumer worker threads did not finish within {} millis", Integer.valueOf(this.tasks.size()), Integer.valueOf(shutdownTimeout));
                }
            } else {
                Iterator<KafkaFetchRecords> it = this.tasks.iterator();
                while (it.hasNext()) {
                    it.next().stop();
                }
                int shutdownTimeout2 = m2getEndpoint().getConfiguration().getShutdownTimeout();
                LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", Integer.valueOf(shutdownTimeout2));
                m2getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor, shutdownTimeout2);
            }
            if (!this.executor.isTerminated()) {
                this.tasks.forEach((v0) -> {
                    v0.stop();
                });
                this.executor.shutdownNow();
            }
        }
        this.tasks.clear();
        this.executor = null;
        if (this.stopOffsetRepo) {
            StateRepository<String, String> offsetRepository = this.endpoint.getConfiguration().getOffsetRepository();
            LOG.debug("Stopping OffsetRepository: {}", offsetRepository);
            ServiceHelper.stopAndShutdownService(offsetRepository);
        }
        super.doStop();
    }

    protected void doSuspend() throws Exception {
        for (KafkaFetchRecords kafkaFetchRecords : this.tasks) {
            LOG.info("Pausing Kafka record fetcher task running client ID {}", kafkaFetchRecords.healthState().getClientId());
            kafkaFetchRecords.pause();
        }
        super.doSuspend();
    }

    protected void doResume() throws Exception {
        for (KafkaFetchRecords kafkaFetchRecords : this.tasks) {
            LOG.info("Resuming Kafka record fetcher task running client ID {}", kafkaFetchRecords.healthState().getClientId());
            kafkaFetchRecords.resume();
        }
        super.doResume();
    }

    public List<TaskHealthState> healthStates() {
        return (List) this.tasks.stream().map(kafkaFetchRecords -> {
            return kafkaFetchRecords.healthState();
        }).collect(Collectors.toList());
    }

    public String adapterFactoryService() {
        return "kafka-adapter-factory";
    }
}
