package org.springframework.kafka.core;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
import org.springframework.kafka.support.micrometer.MicrometerHolder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/kafka/core/KafkaTemplate.class */
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
    protected final LogAccessor logger;
    private final ProducerFactory<K, V> producerFactory;
    private final boolean customProducerFactory;
    private final boolean autoFlush;
    private final boolean transactional;
    private final Map<Thread, Producer<K, V>> producers;
    private final Map<String, String> micrometerTags;
    private final Lock clusterIdLock;
    private String beanName;
    private ApplicationContext applicationContext;
    private RecordMessageConverter messageConverter;
    private String defaultTopic;
    private ProducerListener<K, V> producerListener;
    private String transactionIdPrefix;
    private Duration closeTimeout;
    private boolean allowNonTransactional;
    private boolean converterSet;
    private ConsumerFactory<K, V> consumerFactory;
    private ProducerInterceptor<K, V> producerInterceptor;
    private boolean micrometerEnabled;
    private MicrometerHolder micrometerHolder;
    private boolean observationEnabled;
    private KafkaTemplateObservationConvention observationConvention;
    private ObservationRegistry observationRegistry;

    @Nullable
    private Function<ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider;

    @Nullable
    private KafkaAdmin kafkaAdmin;
    private String clusterId;

    /* loaded from: input_file:org/springframework/kafka/core/KafkaTemplate$SkipAbortException.class */
    private static final class SkipAbortException extends RuntimeException {
        SkipAbortException(Throwable th) {
            super(th);
        }
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this((ProducerFactory) producerFactory, false);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> map) {
        this(producerFactory, false, map);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean z) {
        this(producerFactory, z, null);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean z, @Nullable Map<String, Object> map) {
        this.logger = new LogAccessor(LogFactory.getLog(getClass()));
        this.producers = new ConcurrentHashMap();
        this.micrometerTags = new HashMap();
        this.clusterIdLock = new ReentrantLock();
        this.beanName = "kafkaTemplate";
        this.messageConverter = new MessagingMessageConverter();
        this.producerListener = new LoggingProducerListener();
        this.closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
        this.micrometerEnabled = true;
        this.observationRegistry = ObservationRegistry.NOOP;
        Assert.notNull(producerFactory, "'producerFactory' cannot be null");
        this.autoFlush = z;
        this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
        this.customProducerFactory = !CollectionUtils.isEmpty(map);
        if (this.customProducerFactory) {
            this.producerFactory = producerFactory.copyWithConfigurationOverride(map);
        } else {
            this.producerFactory = producerFactory;
        }
        this.transactional = this.producerFactory.transactionCapable();
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory) this.producerFactory).setApplicationContext(applicationContext);
        }
    }

    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String str) {
        this.defaultTopic = str;
    }

    public void setProducerListener(@Nullable ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    public RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        Assert.notNull(recordMessageConverter, "'messageConverter' cannot be null");
        this.messageConverter = recordMessageConverter;
        this.converterSet = true;
    }

    public void setMessagingConverter(SmartMessageConverter smartMessageConverter) {
        Assert.isTrue(!this.converterSet, "Cannot set the SmartMessageConverter when setting the messageConverter, add the SmartConverter to the message converter instead");
        ((MessagingMessageConverter) this.messageConverter).setMessagingConverter(smartMessageConverter);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public boolean isTransactional() {
        return this.transactional;
    }

    public String getTransactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    public void setTransactionIdPrefix(String str) {
        this.transactionIdPrefix = str;
    }

    public void setCloseTimeout(Duration duration) {
        Assert.notNull(duration, "'closeTimeout' cannot be null");
        this.closeTimeout = duration;
    }

    public void setAllowNonTransactional(boolean z) {
        this.allowNonTransactional = z;
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public boolean isAllowNonTransactional() {
        return this.allowNonTransactional;
    }

    public void setMicrometerEnabled(boolean z) {
        this.micrometerEnabled = z;
    }

    public void setMicrometerTags(@Nullable Map<String, String> map) {
        if (map != null) {
            this.micrometerTags.putAll(map);
        }
    }

    public void setMicrometerTagsProvider(@Nullable Function<ProducerRecord<?, ?>, Map<String, String>> function) {
        this.micrometerTagsProvider = function;
    }

    @Nullable
    public Function<ProducerRecord<?, ?>, Map<String, String>> getMicrometerTagsProvider() {
        return this.micrometerTagsProvider;
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ProducerFactory<K, V> getProducerFactory() {
        return this.producerFactory;
    }

    protected ProducerFactory<K, V> getProducerFactory(String str) {
        return this.producerFactory;
    }

    public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public void setProducerInterceptor(ProducerInterceptor<K, V> producerInterceptor) {
        this.producerInterceptor = producerInterceptor;
    }

    public void setObservationEnabled(boolean z) {
        this.observationEnabled = z;
    }

    public void setObservationConvention(KafkaTemplateObservationConvention kafkaTemplateObservationConvention) {
        this.observationConvention = kafkaTemplateObservationConvention;
    }

    public void setObservationRegistry(ObservationRegistry observationRegistry) {
        Assert.notNull(observationRegistry, "'observationRegistry' must not be null");
        this.observationRegistry = observationRegistry;
    }

    @Nullable
    public KafkaAdmin getKafkaAdmin() {
        return this.kafkaAdmin;
    }

    public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
        this.kafkaAdmin = kafkaAdmin;
    }

    public void afterSingletonsInstantiated() {
        if (!this.observationEnabled || this.applicationContext == null) {
            if (this.micrometerEnabled) {
                this.micrometerHolder = obtainMicrometerHolder();
                return;
            }
            return;
        }
        if (this.observationRegistry.isNoop()) {
            this.observationRegistry = (ObservationRegistry) this.applicationContext.getBeanProvider(ObservationRegistry.class).getIfUnique(() -> {
                return this.observationRegistry;
            });
        }
        if (this.kafkaAdmin == null) {
            this.kafkaAdmin = (KafkaAdmin) this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
            if (this.kafkaAdmin != null) {
                String removeLeadingAndTrailingBrackets = removeLeadingAndTrailingBrackets(this.producerFactory.getConfigurationProperties().get("bootstrap.servers").toString());
                if (removeLeadingAndTrailingBrackets.equals(getAdminBootstrapAddress())) {
                    return;
                }
                HashMap hashMap = new HashMap(this.kafkaAdmin.getConfigurationProperties());
                hashMap.put("bootstrap.servers", removeLeadingAndTrailingBrackets);
                int operationTimeout = this.kafkaAdmin.getOperationTimeout();
                String clusterId = this.kafkaAdmin.getClusterId();
                this.kafkaAdmin = new KafkaAdmin(hashMap);
                this.kafkaAdmin.setOperationTimeout(operationTimeout);
                if (clusterId == null || clusterId.isEmpty()) {
                    return;
                }
                this.kafkaAdmin.setClusterId(clusterId);
            }
        }
    }

    private String getAdminBootstrapAddress() {
        String bootstrapServers = this.kafkaAdmin.getBootstrapServers();
        if (bootstrapServers == null) {
            bootstrapServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault("bootstrap.servers", "").toString();
        }
        return removeLeadingAndTrailingBrackets(bootstrapServers);
    }

    @Nullable
    private String clusterId() {
        if (this.kafkaAdmin != null && this.clusterId == null) {
            this.clusterIdLock.lock();
            try {
                if (this.clusterId == null) {
                    this.clusterId = this.kafkaAdmin.clusterId();
                }
            } finally {
                this.clusterIdLock.unlock();
            }
        }
        return this.clusterId;
    }

    public void onApplicationEvent(ContextStoppedEvent contextStoppedEvent) {
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory) this.producerFactory).onApplicationEvent(contextStoppedEvent);
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> sendDefault(@Nullable V v) {
        return send(this.defaultTopic, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> sendDefault(K k, @Nullable V v) {
        return send(this.defaultTopic, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> sendDefault(Integer num, K k, @Nullable V v) {
        return send(this.defaultTopic, num, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> sendDefault(Integer num, Long l, K k, @Nullable V v) {
        return send(this.defaultTopic, num, l, k, v);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> send(String str, @Nullable V v) {
        return observeSend(new ProducerRecord<>(str, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> send(String str, K k, @Nullable V v) {
        return observeSend(new ProducerRecord<>(str, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> send(String str, Integer num, K k, @Nullable V v) {
        return observeSend(new ProducerRecord<>(str, num, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> send(String str, Integer num, Long l, K k, @Nullable V v) {
        return observeSend(new ProducerRecord<>(str, num, l, k, v));
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> producerRecord) {
        Assert.notNull(producerRecord, "'record' cannot be null");
        return observeSend(producerRecord);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
        byte[] bArr;
        ProducerRecord<?, ?> fromMessage = this.messageConverter.fromMessage(message, this.defaultTopic);
        if (!fromMessage.headers().iterator().hasNext() && (bArr = (byte[]) message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class)) != null) {
            fromMessage.headers().add(KafkaHeaders.CORRELATION_ID, bArr);
        }
        return observeSend(fromMessage);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public List<PartitionInfo> partitionsFor(String str) {
        Producer<K, V> theProducer = getTheProducer();
        try {
            List<PartitionInfo> partitionsFor = theProducer.partitionsFor(str);
            closeProducer(theProducer, inTransaction());
            return partitionsFor;
        } catch (Throwable th) {
            closeProducer(theProducer, inTransaction());
            throw th;
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public Map<MetricName, ? extends Metric> metrics() {
        Producer<K, V> theProducer = getTheProducer();
        try {
            return theProducer.metrics();
        } finally {
            closeProducer(theProducer, inTransaction());
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public <T> T execute(KafkaOperations.ProducerCallback<K, V, T> producerCallback) {
        Assert.notNull(producerCallback, "'callback' cannot be null");
        Producer<K, V> theProducer = getTheProducer();
        try {
            T doInKafka = producerCallback.doInKafka(theProducer);
            closeProducer(theProducer, inTransaction());
            return doInKafka;
        } catch (Throwable th) {
            closeProducer(theProducer, inTransaction());
            throw th;
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K, V, T> operationsCallback) {
        Assert.notNull(operationsCallback, "'callback' cannot be null");
        Assert.state(this.transactional, "Producer factory does not support transactions");
        Thread currentThread = Thread.currentThread();
        Assert.state(this.producers.get(currentThread) == null, "Nested calls to 'executeInTransaction' are not allowed");
        Producer<K, V> createProducer = this.producerFactory.createProducer(this.transactionIdPrefix);
        try {
            createProducer.beginTransaction();
            this.producers.put(currentThread, createProducer);
            try {
                try {
                    try {
                        T doInOperations = operationsCallback.doInOperations(this);
                        try {
                            createProducer.commitTransaction();
                            return doInOperations;
                        } catch (Exception e) {
                            throw new SkipAbortException(e);
                        }
                    } finally {
                        this.producers.remove(currentThread);
                        closeProducer(createProducer, false);
                    }
                } catch (SkipAbortException e2) {
                    throw ((RuntimeException) e2.getCause());
                }
            } catch (Exception e3) {
                try {
                    createProducer.abortTransaction();
                } catch (Exception e4) {
                    e3.addSuppressed(e4);
                }
                throw e3;
            }
        } catch (Exception e5) {
            closeProducer(createProducer, false);
            throw e5;
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public void flush() {
        Producer<K, V> theProducer = getTheProducer();
        try {
            theProducer.flush();
        } finally {
            closeProducer(theProducer, inTransaction());
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) {
        producerForOffsets().sendOffsetsToTransaction(map, consumerGroupMetadata);
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    @Nullable
    public ConsumerRecord<K, V> receive(String str, int i, long j, Duration duration) {
        Consumer<K, V> createConsumer = this.consumerFactory.createConsumer(null, null, null, oneOnly());
        try {
            ConsumerRecord<K, V> receiveOne = receiveOne(new TopicPartition(str, i), j, duration, createConsumer);
            if (createConsumer != null) {
                createConsumer.close();
            }
            return receiveOne;
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> collection, Duration duration) {
        Properties oneOnly = oneOnly();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Consumer<K, V> createConsumer = this.consumerFactory.createConsumer(null, null, null, oneOnly);
        try {
            collection.forEach(topicPartitionOffset -> {
                if (topicPartitionOffset.getOffset() == null || topicPartitionOffset.getOffset().longValue() < 0) {
                    throw new KafkaException("Offset supplied in TopicPartitionOffset is invalid: " + String.valueOf(topicPartitionOffset));
                }
                ConsumerRecord<K, V> receiveOne = receiveOne(topicPartitionOffset.getTopicPartition(), topicPartitionOffset.getOffset().longValue(), duration, createConsumer);
                List list = (List) linkedHashMap.computeIfAbsent(topicPartitionOffset.getTopicPartition(), topicPartition -> {
                    return new ArrayList();
                });
                if (receiveOne != null) {
                    list.add(receiveOne);
                }
            });
            ConsumerRecords<K, V> consumerRecords = new ConsumerRecords<>(linkedHashMap);
            if (createConsumer != null) {
                createConsumer.close();
            }
            return consumerRecords;
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Properties oneOnly() {
        Assert.notNull(this.consumerFactory, "A consumerFactory is required");
        Properties properties = new Properties();
        properties.setProperty("max.poll.records", "1");
        return properties;
    }

    @Nullable
    private ConsumerRecord<K, V> receiveOne(TopicPartition topicPartition, long j, Duration duration, Consumer<K, V> consumer) {
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seek(topicPartition, j);
        ConsumerRecords poll = consumer.poll(duration);
        if (poll.count() == 1) {
            return (ConsumerRecord) poll.iterator().next();
        }
        return null;
    }

    private Producer<K, V> producerForOffsets() {
        Producer<K, V> producer = this.producers.get(Thread.currentThread());
        if (producer == null) {
            KafkaResourceHolder kafkaResourceHolder = (KafkaResourceHolder) TransactionSynchronizationManager.getResource(this.producerFactory);
            Assert.isTrue(kafkaResourceHolder != null, "No transaction in process");
            producer = kafkaResourceHolder.getProducer();
        }
        return producer;
    }

    protected void closeProducer(Producer<K, V> producer, boolean z) {
        if (z) {
            return;
        }
        producer.close(this.closeTimeout);
    }

    private CompletableFuture<SendResult<K, V>> observeSend(ProducerRecord<K, V> producerRecord) {
        Observation observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention.INSTANCE, () -> {
            return new KafkaRecordSenderContext(producerRecord, this.beanName, this::clusterId);
        }, this.observationRegistry);
        observation.start();
        try {
            Observation.Scope openScope = observation.openScope();
            try {
                CompletableFuture<SendResult<K, V>> doSend = doSend(producerRecord, observation);
                if (openScope != null) {
                    openScope.close();
                }
                return doSend;
            } finally {
            }
        } catch (RuntimeException e) {
            if (observation.getContext().getError() == null) {
                observation.error(e);
                observation.stop();
            }
            throw e;
        }
    }

    protected CompletableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord, Observation observation) {
        Producer<K, V> theProducer = getTheProducer(producerRecord.topic());
        this.logger.trace(() -> {
            return "Sending: " + KafkaUtils.format((ProducerRecord<?, ?>) producerRecord);
        });
        CompletableFuture<SendResult<K, V>> completableFuture = new CompletableFuture<>();
        Object obj = null;
        if (this.micrometerHolder != null) {
            obj = this.micrometerHolder.start();
        }
        ProducerRecord<K, V> interceptorProducerRecord = interceptorProducerRecord(producerRecord);
        Future send = theProducer.send(interceptorProducerRecord, buildCallback(interceptorProducerRecord, theProducer, completableFuture, obj, observation));
        if (send.isDone()) {
            try {
                send.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted", e);
            } catch (ExecutionException e2) {
                throw new KafkaException("Send failed", e2.getCause());
            }
        }
        if (this.autoFlush) {
            flush();
        }
        this.logger.trace(() -> {
            return "Sent: " + KafkaUtils.format((ProducerRecord<?, ?>) interceptorProducerRecord);
        });
        return completableFuture;
    }

    private ProducerRecord<K, V> interceptorProducerRecord(ProducerRecord<K, V> producerRecord) {
        return this.producerInterceptor != null ? this.producerInterceptor.onSend(producerRecord) : producerRecord;
    }

    private Callback buildCallback(ProducerRecord<K, V> producerRecord, Producer<K, V> producer, CompletableFuture<SendResult<K, V>> completableFuture, @Nullable Object obj, Observation observation) {
        return (recordMetadata, exc) -> {
            try {
                if (this.producerInterceptor != null) {
                    this.producerInterceptor.onAcknowledgement(recordMetadata, exc);
                }
            } catch (Exception e) {
                this.logger.warn(e, () -> {
                    return "Error executing interceptor onAcknowledgement callback";
                });
            }
            try {
                Observation.Scope openScope = observation.openScope();
                try {
                    if (exc == null) {
                        successTimer(obj, producerRecord);
                        completableFuture.complete(new SendResult(producerRecord, recordMetadata));
                        if (this.producerListener != null) {
                            this.producerListener.onSuccess(producerRecord, recordMetadata);
                        }
                        this.logger.trace(() -> {
                            return "Sent ok: " + KafkaUtils.format((ProducerRecord<?, ?>) producerRecord) + ", metadata: " + String.valueOf(recordMetadata);
                        });
                    } else {
                        failureTimer(obj, exc, producerRecord);
                        observation.error(exc);
                        completableFuture.completeExceptionally(new KafkaProducerException(producerRecord, "Failed to send", exc));
                        if (this.producerListener != null) {
                            this.producerListener.onError(producerRecord, recordMetadata, exc);
                        }
                        this.logger.debug(exc, () -> {
                            return "Failed to send: " + KafkaUtils.format((ProducerRecord<?, ?>) producerRecord);
                        });
                    }
                    if (openScope != null) {
                        openScope.close();
                    }
                } finally {
                }
            } finally {
                observation.stop();
                closeProducer(producer, this.transactional);
            }
        };
    }

    private void successTimer(@Nullable Object obj, ProducerRecord<?, ?> producerRecord) {
        if (obj != null) {
            if (this.micrometerTagsProvider == null) {
                this.micrometerHolder.success(obj);
            } else {
                this.micrometerHolder.success(obj, producerRecord);
            }
        }
    }

    private void failureTimer(@Nullable Object obj, Exception exc, ProducerRecord<?, ?> producerRecord) {
        if (obj != null) {
            if (this.micrometerTagsProvider == null) {
                this.micrometerHolder.failure(obj, exc.getClass().getSimpleName());
            } else {
                this.micrometerHolder.failure(obj, exc.getClass().getSimpleName(), producerRecord);
            }
        }
    }

    @Override // org.springframework.kafka.core.KafkaOperations
    public boolean inTransaction() {
        return this.transactional && !(this.producers.get(Thread.currentThread()) == null && TransactionSynchronizationManager.getResource(this.producerFactory) == null && !TransactionSynchronizationManager.isActualTransactionActive());
    }

    private Producer<K, V> getTheProducer() {
        return getTheProducer(null);
    }

    protected Producer<K, V> getTheProducer(@Nullable String str) {
        boolean z = this.transactional;
        if (z) {
            boolean inTransaction = inTransaction();
            Assert.state(this.allowNonTransactional || inTransaction, "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
            if (!inTransaction) {
                z = false;
            }
        }
        if (!z) {
            return this.allowNonTransactional ? this.producerFactory.createNonTransactionalProducer() : str == null ? this.producerFactory.createProducer() : getProducerFactory(str).createProducer();
        }
        Producer<K, V> producer = this.producers.get(Thread.currentThread());
        return producer != null ? producer : ProducerFactoryUtils.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout).getProducer();
    }

    @Nullable
    private MicrometerHolder obtainMicrometerHolder() {
        MicrometerHolder micrometerHolder = null;
        try {
            if (KafkaUtils.MICROMETER_PRESENT) {
                Function function = obj -> {
                    return this.micrometerTags;
                };
                if (this.micrometerTagsProvider != null) {
                    function = obj2 -> {
                        HashMap hashMap = new HashMap(this.micrometerTags);
                        if (obj2 != null) {
                            hashMap.putAll(this.micrometerTagsProvider.apply((ProducerRecord) obj2));
                        }
                        return hashMap;
                    };
                }
                micrometerHolder = new MicrometerHolder(this.applicationContext, this.beanName, "spring.kafka.template", "KafkaTemplate Timer", function);
            }
        } catch (IllegalStateException e) {
            this.micrometerEnabled = false;
        }
        return micrometerHolder;
    }

    public void destroy() {
        if (this.micrometerHolder != null) {
            this.micrometerHolder.destroy();
        }
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory) this.producerFactory).destroy();
        }
        if (this.producerInterceptor != null) {
            this.producerInterceptor.close();
        }
    }

    private static String removeLeadingAndTrailingBrackets(String str) {
        return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['), ']');
    }
}
