package org.apache.camel.component.pulsar;

import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.component.pulsar.utils.PulsarUtils;
import org.apache.camel.component.pulsar.utils.consumers.ConsumerCreationStrategyFactory;
import org.apache.camel.support.DefaultConsumer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/pulsar/PulsarConsumer.class */
public class PulsarConsumer extends DefaultConsumer implements Suspendable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class);
    private final PulsarEndpoint pulsarEndpoint;
    private final ConsumerCreationStrategyFactory consumerCreationStrategyFactory;
    private Queue<Consumer<byte[]>> pulsarConsumers;
    private Queue<ExecutorService> executors;

    /* loaded from: input_file:org/apache/camel/component/pulsar/PulsarConsumer$PulsarConsumerLoop.class */
    private class PulsarConsumerLoop implements Runnable {
        private final PulsarEndpoint endpoint;
        private final Consumer<byte[]> consumer;

        public PulsarConsumerLoop(PulsarEndpoint pulsarEndpoint, Consumer<byte[]> consumer) {
            this.endpoint = pulsarEndpoint;
            this.consumer = consumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            PulsarMessageListener pulsarMessageListener = new PulsarMessageListener(this.endpoint, PulsarConsumer.this);
            while (true) {
                try {
                    pulsarMessageListener.received(this.consumer, this.consumer.receive());
                } catch (Exception e) {
                    this.endpoint.getExceptionHandler().handleException(e);
                } catch (PulsarClientException e2) {
                    if (e2.getCause() instanceof InterruptedException) {
                        PulsarConsumer.LOGGER.info("Received shutdown signal, exiting");
                        return;
                    }
                    this.endpoint.getExceptionHandler().handleException(e2);
                }
            }
        }
    }

    public PulsarConsumer(PulsarEndpoint pulsarEndpoint, Processor processor) {
        super(pulsarEndpoint, processor);
        this.pulsarEndpoint = pulsarEndpoint;
        this.pulsarConsumers = new ConcurrentLinkedQueue();
        this.consumerCreationStrategyFactory = ConsumerCreationStrategyFactory.create(this);
        this.executors = new ConcurrentLinkedQueue();
    }

    protected void doStart() throws Exception {
        this.pulsarConsumers = PulsarUtils.stopConsumers(this.pulsarConsumers);
        Collection<? extends Consumer<byte[]>> createConsumers = createConsumers(this.pulsarEndpoint, this.consumerCreationStrategyFactory);
        if (!this.pulsarEndpoint.getPulsarConfiguration().isMessageListener()) {
            this.executors.addAll(subscribeWithThreadPool(createConsumers, this.pulsarEndpoint));
        }
        this.pulsarConsumers.addAll(createConsumers);
    }

    protected void doStop() throws PulsarClientException {
        this.executors = PulsarUtils.stopExecutors(this.pulsarEndpoint.getCamelContext().getExecutorServiceManager(), this.executors);
        this.pulsarConsumers = PulsarUtils.stopConsumers(this.pulsarConsumers);
    }

    protected void doSuspend() {
        PulsarUtils.pauseConsumers(this.pulsarConsumers);
    }

    protected void doResume() throws Exception {
        PulsarUtils.resumeConsumers(this.pulsarConsumers);
    }

    private Collection<Consumer<byte[]>> createConsumers(PulsarEndpoint pulsarEndpoint, ConsumerCreationStrategyFactory consumerCreationStrategyFactory) throws Exception {
        return consumerCreationStrategyFactory.getStrategy(pulsarEndpoint.getPulsarConfiguration().getSubscriptionType()).create(pulsarEndpoint);
    }

    private Collection<ExecutorService> subscribeWithThreadPool(Collection<Consumer<byte[]>> collection, PulsarEndpoint pulsarEndpoint) {
        int numberOfConsumerThreads = pulsarEndpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
        return (Collection) collection.stream().map(consumer -> {
            ExecutorService newFixedThreadPool = pulsarEndpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "pulsar-consumer", numberOfConsumerThreads);
            for (int i = 0; i < numberOfConsumerThreads; i++) {
                newFixedThreadPool.submit(new PulsarConsumerLoop(pulsarEndpoint, consumer));
            }
            return newFixedThreadPool;
        }).collect(Collectors.toList());
    }
}
