package org.apache.camel.component.kafka;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.producer.support.DelegatingCallback;
import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
import org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCallBack;
import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
import org.apache.camel.component.kafka.producer.support.ProducerUtil;
import org.apache.camel.component.kafka.producer.support.PropagatedHeadersProvider;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.URISupport;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaProducer.class */
public class KafkaProducer extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
    private Producer kafkaProducer;
    private KafkaProducerHealthCheck producerHealthCheck;
    private WritableHealthCheckRepository healthCheckRepository;
    private String clientId;
    private String transactionId;
    private final KafkaEndpoint endpoint;
    private final KafkaConfiguration configuration;
    private ExecutorService workerPool;
    private boolean shutdownWorkerPool;
    private volatile boolean closeKafkaProducer;
    private final String endpointTopic;
    private final Integer configPartitionKey;
    private final String configKey;

    public KafkaProducer(KafkaEndpoint kafkaEndpoint) {
        super(kafkaEndpoint);
        this.endpoint = kafkaEndpoint;
        this.configuration = kafkaEndpoint.getConfiguration();
        this.endpointTopic = URISupport.extractRemainderPath(URI.create(kafkaEndpoint.getEndpointUri()), true);
        this.configPartitionKey = this.configuration.getPartitionKey();
        this.configKey = this.configuration.getKey();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public KafkaEndpoint m12getEndpoint() {
        return super.getEndpoint();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Properties getProps() {
        Properties createProducerProperties = this.configuration.createProducerProperties();
        this.endpoint.updateClassProperties(createProducerProperties);
        String brokers = this.endpoint.getKafkaClientFactory().getBrokers(this.configuration);
        if (brokers != null) {
            createProducerProperties.put("bootstrap.servers", brokers);
        }
        return createProducerProperties;
    }

    public boolean isReady() {
        boolean z = true;
        try {
            if (this.kafkaProducer instanceof org.apache.kafka.clients.producer.KafkaProducer) {
                org.apache.kafka.clients.producer.KafkaProducer kafkaProducer = this.kafkaProducer;
                Sender sender = (Sender) ReflectionHelper.getField(kafkaProducer.getClass().getDeclaredField("sender"), kafkaProducer);
                NetworkClient networkClient = (NetworkClient) ReflectionHelper.getField(sender.getClass().getDeclaredField("client"), sender);
                LOG.trace("Health-Check calling org.apache.kafka.clients.NetworkClient.hasReadyNode");
                z = networkClient.hasReadyNodes(System.currentTimeMillis());
            }
        } catch (Exception e) {
            LOG.debug("Cannot check hasReadyNodes on KafkaProducer client (NetworkClient) due to " + e.getMessage() + ". This exception is ignored.", e);
        }
        return z;
    }

    public Producer getKafkaProducer() {
        return this.kafkaProducer;
    }

    public void setKafkaProducer(Producer producer) {
        this.kafkaProducer = producer;
    }

    public ExecutorService getWorkerPool() {
        return this.workerPool;
    }

    public void setWorkerPool(ExecutorService executorService) {
        this.workerPool = executorService;
    }

    protected void doStart() throws Exception {
        Properties props = getProps();
        if (this.kafkaProducer == null) {
            createProducer(props);
        }
        this.transactionId = props.getProperty("transactional.id");
        if (this.transactionId != null) {
            this.kafkaProducer.initTransactions();
        }
        if (!this.configuration.isSynchronous() && this.workerPool == null) {
            if (this.configuration.getWorkerPool() != null) {
                this.workerPool = this.configuration.getWorkerPool();
                this.shutdownWorkerPool = false;
            } else {
                this.workerPool = this.endpoint.createProducerExecutor();
                this.shutdownWorkerPool = true;
            }
        }
        if (this.clientId == null) {
            this.clientId = getProps().getProperty("client.id");
            if (this.clientId == null) {
                try {
                    this.clientId = (String) ReflectionHelper.getField(this.kafkaProducer.getClass().getDeclaredField("clientId"), this.kafkaProducer);
                } catch (Exception e) {
                    this.clientId = "";
                }
            }
        }
        this.healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(this.endpoint.getCamelContext(), "producers", WritableHealthCheckRepository.class);
        if (this.healthCheckRepository != null) {
            this.producerHealthCheck = new KafkaProducerHealthCheck(this, this.clientId);
            this.producerHealthCheck.setEnabled(m12getEndpoint().m6getComponent().isHealthCheckProducerEnabled());
            this.healthCheckRepository.addHealthCheck(this.producerHealthCheck);
        }
    }

    protected void doStop() throws Exception {
        if (this.healthCheckRepository != null && this.producerHealthCheck != null) {
            this.healthCheckRepository.removeHealthCheck(this.producerHealthCheck);
            this.producerHealthCheck = null;
        }
        if (this.kafkaProducer != null && this.closeKafkaProducer) {
            LOG.debug("Closing KafkaProducer: {}", this.kafkaProducer);
            this.kafkaProducer.close();
            this.kafkaProducer = null;
        }
        if (!this.shutdownWorkerPool || this.workerPool == null) {
            return;
        }
        int shutdownTimeout = this.configuration.getShutdownTimeout();
        LOG.debug("Shutting down Kafka producer worker threads with timeout {} millis", Integer.valueOf(shutdownTimeout));
        this.endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(this.workerPool, shutdownTimeout);
        this.workerPool = null;
    }

    private void createProducer(Properties properties) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
            LOG.trace("Creating KafkaProducer");
            this.kafkaProducer = this.endpoint.getKafkaClientFactory().getProducer(properties);
            this.closeKafkaProducer = true;
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            LOG.debug("Created KafkaProducer: {}", this.kafkaProducer);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(Exchange exchange, Message message) {
        return new KeyValueHolderIterator(getObjectIterator(message.getBody()), exchange, this.configuration, evaluateTopic(message), new PropagatedHeadersProvider(this, this.configuration, exchange, message));
    }

    protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message) {
        String evaluateTopic = evaluateTopic(message);
        Long l = null;
        Object removeHeader = message.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
        if (removeHeader != null) {
            l = (Long) exchange.getContext().getTypeConverter().convertTo(Long.class, exchange, removeHeader);
            LOG.debug("Using override TimeStamp: {}", removeHeader);
        }
        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, message);
        Integer overridePartitionKey = getOverridePartitionKey(message);
        Object overrideKey = getOverrideKey(message);
        if (overrideKey != null) {
            overrideKey = ProducerUtil.tryConvertToSerializedType(exchange, overrideKey, this.configuration.getKeySerializer());
        }
        return new ProducerRecord<>(evaluateTopic, overridePartitionKey, l, overrideKey, ProducerUtil.tryConvertToSerializedType(exchange, message.getBody(), this.configuration.getValueSerializer()), propagatedHeaders);
    }

    private Object getOverrideKey(Message message) {
        return ObjectHelper.isEmpty(this.configKey) ? message.getHeader(KafkaConstants.KEY) : this.configKey;
    }

    private Integer getOverridePartitionKey(Message message) {
        return ObjectHelper.isEmpty(this.configPartitionKey) ? (Integer) message.getHeader(KafkaConstants.PARTITION_KEY, Integer.class) : this.configPartitionKey;
    }

    protected KeyValueHolder<Object, ProducerRecord<Object, Object>> createKeyValueHolder(Exchange exchange, Message message) {
        return new KeyValueHolder<>(exchange, createRecord(exchange, message));
    }

    private String evaluateTopic(Message message) {
        String str = (String) this.endpoint.getCamelContext().getTypeConverter().tryConvertTo(String.class, message.removeHeader(KafkaConstants.OVERRIDE_TOPIC));
        if (str != null) {
            LOG.debug("Using override topic: {}", str);
            return str;
        }
        String topic = this.configuration.getTopic();
        return topic != null ? topic : this.endpointTopic;
    }

    private boolean isIterable(Object obj) {
        return (obj instanceof Iterable) || (obj instanceof Iterator);
    }

    private Iterator<Object> getObjectIterator(Object obj) {
        Iterator<Object> it = null;
        if (obj instanceof Iterable) {
            it = ((Iterable) obj).iterator();
        } else if (obj instanceof Iterator) {
            it = (Iterator) obj;
        }
        return it;
    }

    public List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
        Map headers = message.getHeaders();
        ArrayList arrayList = new ArrayList(headers.size());
        Iterator it = headers.entrySet().iterator();
        while (it.hasNext()) {
            RecordHeader recordHeader = getRecordHeader((Map.Entry) it.next(), exchange);
            if (recordHeader != null) {
                arrayList.add(recordHeader);
            }
        }
        return arrayList;
    }

    private boolean shouldBeFiltered(String str, Object obj, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
        return !headerFilterStrategy.applyFilterToCamelHeaders(str, obj, exchange);
    }

    private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, Exchange exchange) {
        byte[] serialize;
        HeaderFilterStrategy headerFilterStrategy = this.configuration.getHeaderFilterStrategy();
        String key = entry.getKey();
        Object value = entry.getValue();
        if (!shouldBeFiltered(key, value, exchange, headerFilterStrategy) || (serialize = this.configuration.getHeaderSerializer().serialize(key, value)) == null) {
            return null;
        }
        return new RecordHeader(key, serialize);
    }

    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        if (this.transactionId != null) {
            startKafkaTransaction(exchange);
        }
        if (isIterable(in.getBody())) {
            processIterableSync(exchange, in);
        } else {
            processSingleMessageSync(exchange, in);
        }
    }

    private void processSingleMessageSync(Exchange exchange, Message message) throws InterruptedException, ExecutionException {
        postProcessMetadata(exchange, this.kafkaProducer.send(createRecord(exchange, message)));
    }

    private void processIterableSync(Exchange exchange, Message message) throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable = createRecordIterable(exchange, message);
        ArrayList arrayList2 = new ArrayList();
        if (this.configuration.isRecordMetadata()) {
            exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, arrayList2);
        }
        while (createRecordIterable.hasNext()) {
            KeyValueHolder<Object, ProducerRecord<Object, Object>> next = createRecordIterable.next();
            ProducerRecord producerRecord = (ProducerRecord) next.getValue();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message to topic: {}, partition: {}, key: {}", new Object[]{producerRecord.topic(), producerRecord.partition(), producerRecord.key()});
            }
            arrayList.add(new KeyValueHolder<>(next.getKey(), this.kafkaProducer.send(producerRecord)));
        }
        postProcessMetadata(arrayList, arrayList2);
    }

    private void postProcessMetadata(List<KeyValueHolder<Object, Future<RecordMetadata>>> list, List<RecordMetadata> list2) throws InterruptedException, ExecutionException {
        for (KeyValueHolder<Object, Future<RecordMetadata>> keyValueHolder : list) {
            list2.addAll(postProcessMetadata(keyValueHolder.getKey(), (Future<RecordMetadata>) keyValueHolder.getValue()));
        }
    }

    private List<RecordMetadata> postProcessMetadata(Object obj, Future<RecordMetadata> future) throws InterruptedException, ExecutionException {
        RecordMetadata recordMetadata = future.get();
        if (!this.configuration.isRecordMetadata()) {
            return Collections.emptyList();
        }
        List<RecordMetadata> singletonList = Collections.singletonList(recordMetadata);
        ProducerUtil.setRecordMetadata(obj, singletonList);
        return singletonList;
    }

    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        KafkaProducerCallBack kafkaProducerCallBack = new KafkaProducerCallBack(exchange, asyncCallback, this.workerPool, this.configuration.isRecordMetadata());
        Message message = exchange.getMessage();
        Object body = message.getBody();
        if (this.transactionId != null) {
            startKafkaTransaction(exchange);
        }
        try {
            if (isIterable(body)) {
                processIterableAsync(exchange, kafkaProducerCallBack, message);
            } else {
                doSend(exchange, createRecord(exchange, message), kafkaProducerCallBack);
            }
            return kafkaProducerCallBack.allSent();
        } catch (Exception e) {
            exchange.setException(e);
            asyncCallback.done(true);
            return true;
        }
    }

    private void processIterableAsync(Exchange exchange, KafkaProducerCallBack kafkaProducerCallBack, Message message) {
        Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable = createRecordIterable(exchange, message);
        while (createRecordIterable.hasNext()) {
            doSend(createRecordIterable, kafkaProducerCallBack);
        }
    }

    private void doSend(Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> it, KafkaProducerCallBack kafkaProducerCallBack) {
        doSend(it.next(), kafkaProducerCallBack);
    }

    private void doSend(KeyValueHolder<Object, ProducerRecord<Object, Object>> keyValueHolder, KafkaProducerCallBack kafkaProducerCallBack) {
        doSend(keyValueHolder.getKey(), (ProducerRecord) keyValueHolder.getValue(), kafkaProducerCallBack);
    }

    private void doSend(Object obj, ProducerRecord<Object, Object> producerRecord, KafkaProducerCallBack kafkaProducerCallBack) {
        kafkaProducerCallBack.increment();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending message to topic: {}, partition: {}, key: {}", new Object[]{producerRecord.topic(), producerRecord.partition(), producerRecord.key()});
        }
        if (obj != null) {
            this.kafkaProducer.send(producerRecord, new DelegatingCallback(kafkaProducerCallBack, new KafkaProducerMetadataCallBack(obj, this.configuration.isRecordMetadata())));
        } else {
            this.kafkaProducer.send(producerRecord, kafkaProducerCallBack);
        }
    }

    private void startKafkaTransaction(Exchange exchange) {
        exchange.getUnitOfWork().beginTransactedBy(this.transactionId);
        this.kafkaProducer.beginTransaction();
        exchange.getUnitOfWork().addSynchronization(new KafkaTransactionSynchronization(this.transactionId, this.kafkaProducer));
    }
}
