package org.springframework.kafka.requestreply;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.2.jar:org/springframework/kafka/requestreply/ReplyingKafkaTemplate.class */
public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implements BatchMessageListener<K, R>, InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R>, ConsumerSeekAware {
    private static final String WITH_CORRELATION_ID = " with correlationId: ";
    private static final int FIVE = 5;
    private static final Duration DEFAULT_REPLY_TIMEOUT = Duration.ofSeconds(5);
    private final GenericMessageListenerContainer<K, R> replyContainer;
    private final ConcurrentMap<Object, RequestReplyFuture<K, V, R>> futures;
    private final byte[] replyTopic;
    private final byte[] replyPartition;
    private TaskScheduler scheduler;
    private int phase;
    private boolean autoStartup;
    private Duration defaultReplyTimeout;
    private boolean schedulerSet;
    private boolean sharedReplyTopic;
    private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy;
    private boolean binaryCorrelation;
    private String correlationHeaderName;
    private String replyTopicHeaderName;
    private String replyPartitionHeaderName;
    private Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker;
    private CountDownLatch assignLatch;
    private volatile boolean running;
    private volatile boolean schedulerInitialized;

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> genericMessageListenerContainer) {
        this(producerFactory, genericMessageListenerContainer, false);
    }

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> genericMessageListenerContainer, boolean z) {
        super(producerFactory, z);
        this.futures = new ConcurrentHashMap();
        this.scheduler = new ThreadPoolTaskScheduler();
        this.autoStartup = true;
        this.defaultReplyTimeout = DEFAULT_REPLY_TIMEOUT;
        this.correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
        this.binaryCorrelation = true;
        this.correlationHeaderName = KafkaHeaders.CORRELATION_ID;
        this.replyTopicHeaderName = KafkaHeaders.REPLY_TOPIC;
        this.replyPartitionHeaderName = KafkaHeaders.REPLY_PARTITION;
        this.replyErrorChecker = consumerRecord -> {
            return null;
        };
        this.assignLatch = new CountDownLatch(1);
        Assert.notNull(genericMessageListenerContainer, "'replyContainer' cannot be null");
        this.replyContainer = genericMessageListenerContainer;
        this.replyContainer.setupMessageListener(this);
        ContainerProperties containerProperties = this.replyContainer.getContainerProperties();
        String str = null;
        byte[] bArr = null;
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        String[] topics = containerProperties.getTopics();
        if (topics != null && topics.length == 1) {
            str = topics[0];
        } else if (topicPartitions != null && topicPartitions.length == 1) {
            TopicPartitionOffset topicPartitionOffset = topicPartitions[0];
            Assert.notNull(topicPartitionOffset, "'topicPartitionsToAssign' must not be null");
            str = topicPartitionOffset.getTopic();
            ByteBuffer allocate = ByteBuffer.allocate(4);
            allocate.putInt(topicPartitionOffset.getPartition());
            bArr = allocate.array();
        }
        if (str != null) {
            this.replyTopic = str.getBytes(StandardCharsets.UTF_8);
            this.replyPartition = bArr;
        } else {
            this.replyTopic = null;
            this.replyPartition = null;
            this.logger.debug(() -> {
                return "Could not determine container's reply topic/partition; senders must populate at least the kafka_replyTopic header, and optionally the kafka_replyPartition header";
            });
        }
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        Assert.notNull(taskScheduler, "'scheduler' cannot be null");
        this.scheduler = taskScheduler;
        this.schedulerSet = true;
    }

    protected Duration getDefaultReplyTimeout() {
        return this.defaultReplyTimeout;
    }

    public void setDefaultReplyTimeout(Duration duration) {
        Assert.notNull(duration, "'defaultReplyTimeout' cannot be null");
        Assert.isTrue(duration.toMillis() >= 0, "'replyTimeout' must be >= 0");
        this.defaultReplyTimeout = duration;
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
        return this.replyContainer.getAssignedPartitions();
    }

    public void setSharedReplyTopic(boolean z) {
        this.sharedReplyTopic = z;
    }

    public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> function) {
        Assert.notNull(function, "'correlationStrategy' cannot be null");
        this.correlationStrategy = function;
    }

    public void setCorrelationHeaderName(String str) {
        Assert.notNull(str, "'correlationHeaderName' cannot be null");
        this.correlationHeaderName = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getCorrelationHeaderName() {
        return this.correlationHeaderName;
    }

    public void setReplyTopicHeaderName(String str) {
        Assert.notNull(str, "'replyTopicHeaderName' cannot be null");
        this.replyTopicHeaderName = str;
    }

    public void setReplyPartitionHeaderName(String str) {
        Assert.notNull(str, "'replyPartitionHeaderName' cannot be null");
        this.replyPartitionHeaderName = str;
    }

    public void setReplyErrorChecker(Function<ConsumerRecord<?, ?>, Exception> function) {
        Assert.notNull(function, "'replyErrorChecker' cannot be null");
        this.replyErrorChecker = function;
    }

    public void setBinaryCorrelation(boolean z) {
        this.binaryCorrelation = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isBinaryCorrelation() {
        return this.binaryCorrelation;
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        if (this.schedulerSet || this.schedulerInitialized) {
            return;
        }
        ((ThreadPoolTaskScheduler) this.scheduler).initialize();
        this.schedulerInitialized = true;
    }

    @Override // org.springframework.context.Lifecycle
    public synchronized void start() {
        if (this.running) {
            return;
        }
        try {
            afterPropertiesSet();
            this.assignLatch = new CountDownLatch(1);
            this.replyContainer.start();
            this.running = true;
        } catch (Exception e) {
            throw new KafkaException("Failed to initialize", e);
        }
    }

    @Override // org.springframework.context.Lifecycle
    public synchronized void stop() {
        if (this.running) {
            this.running = false;
            this.replyContainer.stop();
            this.futures.clear();
        }
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    @Override // org.springframework.kafka.listener.ConsumerSeekAware
    public void onFirstPoll() {
        this.assignLatch.countDown();
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public boolean waitForAssignment(Duration duration) throws InterruptedException {
        return this.assignLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message) {
        return sendAndReceive(message, this.defaultReplyTimeout, null);
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message, Duration duration) {
        return sendAndReceive(message, duration, null);
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, @Nullable ParameterizedTypeReference<P> parameterizedTypeReference) {
        return sendAndReceive(message, this.defaultReplyTimeout, parameterizedTypeReference);
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, @Nullable Duration duration, @Nullable ParameterizedTypeReference<P> parameterizedTypeReference) {
        RequestReplyFuture<K, V, R> sendAndReceive = sendAndReceive(getMessageConverter().fromMessage(message, getDefaultTopic()), duration);
        RequestReplyTypedMessageFuture<K, V, P> requestReplyTypedMessageFuture = new RequestReplyTypedMessageFuture<>(sendAndReceive.getSendFuture());
        sendAndReceive.whenComplete((consumerRecord, th) -> {
            if (th != null) {
                requestReplyTypedMessageFuture.completeExceptionally(th);
                return;
            }
            try {
                requestReplyTypedMessageFuture.complete(getMessageConverter().toMessage(consumerRecord, null, null, parameterizedTypeReference == null ? null : parameterizedTypeReference.getType()));
            } catch (Exception e) {
                requestReplyTypedMessageFuture.completeExceptionally(e);
            }
        });
        return requestReplyTypedMessageFuture;
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> producerRecord) {
        return sendAndReceive(producerRecord, this.defaultReplyTimeout);
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> producerRecord, @Nullable Duration duration) {
        Assert.state(this.running, "Template has not been start()ed");
        Duration duration2 = duration;
        if (duration2 == null) {
            duration2 = this.defaultReplyTimeout;
        }
        CorrelationKey apply = this.correlationStrategy.apply(producerRecord);
        Assert.notNull(apply, "the created 'correlationId' cannot be null");
        Headers headers = producerRecord.headers();
        if (!(headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null) && this.replyTopic != null) {
            headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
            if (this.replyPartition != null) {
                headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
            }
        }
        String correlationKey = this.binaryCorrelation ? apply : apply.toString();
        headers.add(new RecordHeader(this.correlationHeaderName, this.binaryCorrelation ? apply.getCorrelationId() : correlationKey.getBytes(StandardCharsets.UTF_8)));
        this.logger.debug(() -> {
            return "Sending: " + KafkaUtils.format((ProducerRecord<?, ?>) producerRecord) + " with correlationId: " + apply;
        });
        RequestReplyFuture<K, V, R> requestReplyFuture = new RequestReplyFuture<>();
        this.futures.put(correlationKey, requestReplyFuture);
        try {
            requestReplyFuture.setSendFuture(send(producerRecord));
            scheduleTimeout(producerRecord, correlationKey, duration2);
            return requestReplyFuture;
        } catch (Exception e) {
            this.futures.remove(correlationKey);
            throw new KafkaException("Send failed", e);
        }
    }

    private void scheduleTimeout(ProducerRecord<K, V> producerRecord, Object obj, Duration duration) {
        this.scheduler.schedule(() -> {
            RequestReplyFuture<K, V, R> remove = this.futures.remove(obj);
            if (remove != null) {
                this.logger.warn(() -> {
                    return "Reply timed out for: " + KafkaUtils.format((ProducerRecord<?, ?>) producerRecord) + " with correlationId: " + obj;
                });
                if (handleTimeout(obj, remove)) {
                    return;
                }
                remove.completeExceptionally(new KafkaReplyTimeoutException("Reply timed out"));
            }
        }, Instant.now().plus((TemporalAmount) duration));
    }

    protected boolean handleTimeout(Object obj, RequestReplyFuture<K, V, R> requestReplyFuture) {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isPending(Object obj) {
        return this.futures.containsKey(obj);
    }

    @Override // org.springframework.kafka.core.KafkaTemplate, org.springframework.beans.factory.DisposableBean
    public void destroy() {
        if (this.schedulerSet) {
            return;
        }
        ((ThreadPoolTaskScheduler) this.scheduler).destroy();
    }

    private static <K, V> CorrelationKey defaultCorrelationIdStrategy(ProducerRecord<K, V> producerRecord) {
        UUID randomUUID = UUID.randomUUID();
        byte[] bArr = new byte[16];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        wrap.putLong(randomUUID.getMostSignificantBits());
        wrap.putLong(randomUUID.getLeastSignificantBits());
        return new CorrelationKey(bArr);
    }

    @Override // org.springframework.kafka.listener.GenericMessageListener
    public void onMessage(List<ConsumerRecord<K, R>> list) {
        list.forEach(consumerRecord -> {
            Header lastHeader = consumerRecord.headers().lastHeader(this.correlationHeaderName);
            Object obj = null;
            if (lastHeader != null) {
                obj = this.binaryCorrelation ? new CorrelationKey(lastHeader.value()) : new String(lastHeader.value(), StandardCharsets.UTF_8);
            }
            if (obj == null) {
                this.logger.error(() -> {
                    return "No correlationId found in reply: " + KafkaUtils.format((ConsumerRecord<?, ?>) consumerRecord) + " - to use request/reply semantics, the responding server must return the correlation id  in the '" + this.correlationHeaderName + "' header";
                });
                return;
            }
            RequestReplyFuture<K, V, R> remove = this.futures.remove(obj);
            Object obj2 = obj;
            if (remove == null) {
                logLateArrival(consumerRecord, obj);
                return;
            }
            boolean z = true;
            Exception checkForErrors = checkForErrors(consumerRecord);
            if (checkForErrors != null) {
                z = false;
                remove.completeExceptionally(checkForErrors);
            }
            if (z) {
                this.logger.debug(() -> {
                    return "Received: " + KafkaUtils.format((ConsumerRecord<?, ?>) consumerRecord) + " with correlationId: " + obj2;
                });
                remove.complete(consumerRecord);
            }
        });
    }

    @Nullable
    protected Exception checkForErrors(ConsumerRecord<K, R> consumerRecord) {
        DeserializationException checkDeserialization;
        return ((consumerRecord.value() == null || consumerRecord.key() == null) && (checkDeserialization = checkDeserialization(consumerRecord, this.logger)) != null) ? checkDeserialization : this.replyErrorChecker.apply(consumerRecord);
    }

    @Nullable
    public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> consumerRecord, LogAccessor logAccessor) {
        DeserializationException exceptionFromHeader = ListenerUtils.getExceptionFromHeader(consumerRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logAccessor);
        if (exceptionFromHeader != null) {
            logAccessor.error(exceptionFromHeader, () -> {
                return "Reply value deserialization failed for " + consumerRecord.topic() + "-" + consumerRecord.partition() + "@" + consumerRecord.offset();
            });
            return exceptionFromHeader;
        }
        DeserializationException exceptionFromHeader2 = ListenerUtils.getExceptionFromHeader(consumerRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logAccessor);
        if (exceptionFromHeader2 == null) {
            return null;
        }
        logAccessor.error(exceptionFromHeader2, () -> {
            return "Reply key deserialization failed for " + consumerRecord.topic() + "-" + consumerRecord.partition() + "@" + consumerRecord.offset();
        });
        return exceptionFromHeader2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void logLateArrival(ConsumerRecord<K, R> consumerRecord, Object obj) {
        if (this.sharedReplyTopic) {
            this.logger.debug(() -> {
                return missingCorrelationLogMessage(consumerRecord, obj);
            });
        } else {
            this.logger.error(() -> {
                return missingCorrelationLogMessage(consumerRecord, obj);
            });
        }
    }

    private String missingCorrelationLogMessage(ConsumerRecord<K, R> consumerRecord, Object obj) {
        return "No pending reply: " + KafkaUtils.format((ConsumerRecord<?, ?>) consumerRecord) + " with correlationId: " + obj + ", perhaps timed out, or using a shared reply topic";
    }
}
