package org.apache.camel.component.pulsar;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConversionException;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/pulsar/PulsarProducer.class */
public class PulsarProducer extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class);
    private final Object mutex;
    private final PulsarEndpoint pulsarEndpoint;
    private volatile Producer<byte[]> producer;

    public PulsarProducer(PulsarEndpoint pulsarEndpoint) {
        super(pulsarEndpoint);
        this.mutex = new Object();
        this.pulsarEndpoint = pulsarEndpoint;
    }

    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        try {
            byte[] serialize = serialize(exchange, exchange.getIn().getBody());
            TypedMessageBuilder newMessage = this.producer.newMessage();
            newMessage.value(serialize);
            String str = (String) exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
            if (ObjectHelper.isNotEmpty(str)) {
                newMessage.key(str);
            }
            Map cast = CastUtils.cast((Map) exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
            if (ObjectHelper.isNotEmpty(cast)) {
                newMessage.properties(cast);
            }
            Long l = (Long) exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
            if (l != null) {
                newMessage.eventTime(l.longValue());
            }
            Long l2 = (Long) exchange.getIn().getHeader(PulsarMessageHeaders.DELIVER_AT_OUT, Long.class);
            if (l2 != null) {
                newMessage.deliverAt(l2.longValue());
            }
            newMessage.sendAsync().thenAccept(messageId -> {
                exchange.getIn().setBody(messageId);
            }).whenComplete((r9, th) -> {
                if (th != null) {
                    try {
                        exchange.setException(new CamelExchangeException("An error occurred while sending a message to pulsar", exchange, th));
                    } finally {
                        asyncCallback.done(false);
                    }
                }
            });
            return false;
        } catch (Exception e) {
            exchange.setException(e);
            asyncCallback.done(true);
            return true;
        }
    }

    private static byte[] serialize(Exchange exchange, Object obj) throws IOException {
        byte[] serialize;
        try {
            serialize = (byte[]) exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, obj);
        } catch (NoTypeConversionAvailableException | TypeConversionException e) {
            serialize = PulsarMessageUtils.serialize(obj);
        }
        return serialize;
    }

    private void createProducer() throws PulsarClientException {
        synchronized (this.mutex) {
            if (this.producer == null) {
                String uri = this.pulsarEndpoint.getUri();
                PulsarConfiguration pulsarConfiguration = this.pulsarEndpoint.getPulsarConfiguration();
                String producerName = pulsarConfiguration.getProducerName();
                ProducerBuilder enableChunking = this.pulsarEndpoint.getPulsarClient().newProducer().topic(uri).sendTimeout(pulsarConfiguration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(pulsarConfiguration.isBlockIfQueueFull()).maxPendingMessages(pulsarConfiguration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(pulsarConfiguration.getMaxPendingMessagesAcrossPartitions()).batchingMaxPublishDelay(pulsarConfiguration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(pulsarConfiguration.getMaxPendingMessages()).enableBatching(pulsarConfiguration.isBatchingEnabled()).batcherBuilder(pulsarConfiguration.getBatcherBuilder()).initialSequenceId(pulsarConfiguration.getInitialSequenceId()).compressionType(pulsarConfiguration.getCompressionType()).enableChunking(pulsarConfiguration.isChunkingEnabled());
                if (ObjectHelper.isNotEmpty(pulsarConfiguration.getMessageRouter())) {
                    enableChunking.messageRouter(pulsarConfiguration.getMessageRouter());
                } else {
                    enableChunking.messageRoutingMode(pulsarConfiguration.getMessageRoutingMode());
                }
                if (producerName != null) {
                    enableChunking.producerName(producerName);
                }
                this.producer = enableChunking.create();
            }
        }
    }

    protected void doStart() throws Exception {
        LOG.debug("Starting the pulsar producer: {}", this);
        if (this.producer == null) {
            createProducer();
        }
    }

    protected void doStop() throws Exception {
        LOG.debug("Stopping the pulsar producer: {}", this);
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
    }
}
