package org.springframework.integration.kafka.inbound;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-6.0.6.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.class */
public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSupport implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable {
    private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final KafkaMessageDrivenChannelAdapter<K, V>.IntegrationRecordMessageListener recordListener;
    private final KafkaMessageDrivenChannelAdapter<K, V>.IntegrationBatchMessageListener batchListener;
    private final ListenerMode mode;
    private RecordFilterStrategy<K, V> recordFilterStrategy;
    private boolean ackDiscarded;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private boolean filterInRetry;
    private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
    private boolean bindSourceRecord;
    private boolean containerDeliveryAttemptPresent;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-6.0.6.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.class */
    public class IntegrationBatchMessageListener extends BatchMessagingMessageListenerAdapter<K, V> implements RetryListener {
        IntegrationBatchMessageListener() {
            super(null, null);
        }

        @Override // org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter, org.springframework.kafka.listener.ConsumerSeekAware
        public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
            if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
                KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(map, consumerSeekCallback);
            }
        }

        @Override // org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter, org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener
        public void onMessage(List<ConsumerRecord<K, V>> list, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message<?> message = null;
            if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry) {
                message = toMessage(list, acknowledgment, consumer);
            }
            if (message != null) {
                RetryTemplate retryTemplate = KafkaMessageDrivenChannelAdapter.this.retryTemplate;
                if (retryTemplate != null) {
                    doWIthRetry(list, acknowledgment, consumer, message, retryTemplate);
                } else {
                    KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(message, list);
                }
            }
        }

        private void doWIthRetry(List<ConsumerRecord<K, V>> list, Acknowledgment acknowledgment, Consumer<?, ?> consumer, Message<?> message, RetryTemplate retryTemplate) {
            KafkaMessageDrivenChannelAdapter.this.doWithRetry(retryTemplate, KafkaMessageDrivenChannelAdapter.this.recoveryCallback, list, acknowledgment, consumer, () -> {
                if (KafkaMessageDrivenChannelAdapter.this.filterInRetry) {
                    List<ConsumerRecord<K, V>> filterBatch = KafkaMessageDrivenChannelAdapter.this.recordFilterStrategy.filterBatch(list);
                    Message message2 = message;
                    if (filterBatch.size() != list.size()) {
                        message2 = toMessage(filterBatch, acknowledgment, consumer);
                    }
                    KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(message2, filterBatch);
                }
            });
        }

        @Nullable
        private Message<?> toMessage(List<ConsumerRecord<K, V>> list, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message<?> message = null;
            try {
                message = toMessagingMessage(list, acknowledgment, consumer);
                KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(list, message, false);
            } catch (RuntimeException e) {
                ConversionException conversionException = new ConversionException("Failed to convert to message", new ArrayList(list), e);
                MessageChannel errorChannel = KafkaMessageDrivenChannelAdapter.this.getErrorChannel();
                if (errorChannel == null) {
                    throw e;
                }
                KafkaMessageDrivenChannelAdapter.this.getMessagingTemplate().send((MessagingTemplate) errorChannel, (Message<?>) KafkaMessageDrivenChannelAdapter.this.buildErrorMessage(message, conversionException));
            }
            return message;
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
            if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
                return true;
            }
            KafkaMessageDrivenChannelAdapter.ATTRIBUTES_HOLDER.set(retryContext);
            return true;
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            KafkaMessageDrivenChannelAdapter.ATTRIBUTES_HOLDER.remove();
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
        }

        @Override // org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter, org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener, org.springframework.kafka.listener.GenericMessageListener
        public /* bridge */ /* synthetic */ void onMessage(Object obj, Acknowledgment acknowledgment, Consumer consumer) {
            onMessage((List) obj, acknowledgment, (Consumer<?, ?>) consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-6.0.6.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.class */
    public class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> implements RetryListener {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        @Override // org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter, org.springframework.kafka.listener.ConsumerSeekAware
        public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
            if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
                KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(map, consumerSeekCallback);
            }
        }

        @Override // org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter, org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener
        public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            try {
                Message<?> messagingMessage = toMessagingMessage(consumerRecord, acknowledgment, consumer);
                RetryTemplate retryTemplate = KafkaMessageDrivenChannelAdapter.this.retryTemplate;
                if (retryTemplate != null) {
                    KafkaMessageDrivenChannelAdapter.this.doWithRetry(retryTemplate, KafkaMessageDrivenChannelAdapter.this.recoveryCallback, consumerRecord, acknowledgment, consumer, () -> {
                        if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry || passesFilter(consumerRecord)) {
                            KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(enhanceHeadersAndSaveAttributes(messagingMessage, consumerRecord), consumerRecord);
                        }
                    });
                } else {
                    KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(enhanceHeadersAndSaveAttributes(messagingMessage, consumerRecord), consumerRecord);
                }
            } catch (RuntimeException e) {
                if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
                    KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(consumerRecord, null, true);
                }
                if (!KafkaMessageDrivenChannelAdapter.this.sendErrorMessageIfNecessary(null, new ConversionException("Failed to convert to message", (ConsumerRecord<?, ?>) consumerRecord, (Throwable) e))) {
                    throw e;
                }
            }
        }

        private boolean passesFilter(ConsumerRecord<K, V> consumerRecord) {
            RecordFilterStrategy<K, V> recordFilterStrategy = KafkaMessageDrivenChannelAdapter.this.recordFilterStrategy;
            return recordFilterStrategy == null || !recordFilterStrategy.filter(consumerRecord);
        }

        private Message<?> enhanceHeadersAndSaveAttributes(Message<?> message, ConsumerRecord<K, V> consumerRecord) {
            Message<?> message2 = message;
            if (message.getHeaders() instanceof KafkaMessageHeaders) {
                Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
                if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
                    rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger(((RetryContext) KafkaMessageDrivenChannelAdapter.ATTRIBUTES_HOLDER.get()).getRetryCount() + 1));
                } else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
                    rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger(ByteBuffer.wrap(consumerRecord.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt()));
                }
                if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
                    rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, consumerRecord);
                }
            } else {
                MessageBuilder fromMessage = MessageBuilder.fromMessage(message);
                if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
                    fromMessage.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, (Object) new AtomicInteger(((RetryContext) KafkaMessageDrivenChannelAdapter.ATTRIBUTES_HOLDER.get()).getRetryCount() + 1));
                } else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
                    fromMessage.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, (Object) new AtomicInteger(ByteBuffer.wrap(consumerRecord.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt()));
                }
                if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
                    fromMessage.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, (Object) consumerRecord);
                }
                message2 = fromMessage.build();
            }
            KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(consumerRecord, message2, false);
            return message2;
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
            if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
                return true;
            }
            KafkaMessageDrivenChannelAdapter.ATTRIBUTES_HOLDER.set(retryContext);
            return true;
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            KafkaMessageDrivenChannelAdapter.ATTRIBUTES_HOLDER.remove();
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
        }

        @Override // org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter, org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener, org.springframework.kafka.listener.GenericMessageListener
        public /* bridge */ /* synthetic */ void onMessage(Object obj, Acknowledgment acknowledgment, Consumer consumer) {
            onMessage((ConsumerRecord) obj, acknowledgment, (Consumer<?, ?>) consumer);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-6.0.6.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$ListenerMode.class */
    public enum ListenerMode {
        record,
        batch
    }

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer) {
        this(abstractMessageListenerContainer, ListenerMode.record);
    }

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer, ListenerMode listenerMode) {
        this.recordListener = new IntegrationRecordMessageListener();
        this.batchListener = new IntegrationBatchMessageListener();
        Assert.notNull(abstractMessageListenerContainer, "messageListenerContainer is required");
        Assert.isNull(abstractMessageListenerContainer.getContainerProperties().getMessageListener(), "Container must not already have a listener");
        this.messageListenerContainer = abstractMessageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.mode = listenerMode;
        setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
        if (JacksonPresent.isJackson2Present()) {
            MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
            DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
            defaultKafkaHeaderMapper.addTrustedPackages((String[]) JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messagingMessageConverter.setHeaderMapper(defaultKafkaHeaderMapper);
            this.recordListener.setMessageConverter(messagingMessageConverter);
            this.batchListener.setMessageConverter(messagingMessageConverter);
        }
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        if (messageConverter instanceof RecordMessageConverter) {
            this.recordListener.setMessageConverter((RecordMessageConverter) messageConverter);
        } else {
            if (!(messageConverter instanceof BatchMessageConverter)) {
                throw new IllegalArgumentException("Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
            }
            this.batchListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
        }
    }

    public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.recordListener.setMessageConverter(recordMessageConverter);
    }

    public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
        this.batchListener.setBatchMessageConverter(batchMessageConverter);
    }

    public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }

    public void setAckDiscarded(boolean z) {
        this.ackDiscarded = z;
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        Assert.isTrue(retryTemplate == null || this.mode.equals(ListenerMode.record), "Retry is not supported with mode=batch");
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setFilterInRetry(boolean z) {
        this.filterInRetry = z;
    }

    public void setPayloadType(Class<?> cls) {
        this.recordListener.setFallbackType(cls);
        this.batchListener.setFallbackType(cls);
    }

    public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> biConsumer) {
        this.onPartitionsAssignedSeekCallback = biConsumer;
    }

    public void setBindSourceRecord(boolean z) {
        this.bindSourceRecord = z;
    }

    @Override // org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
        if (this.mode.equals(ListenerMode.record)) {
            AcknowledgingConsumerAwareMessageListener acknowledgingConsumerAwareMessageListener = this.recordListener;
            boolean z = (!this.filterInRetry || this.retryTemplate == null || this.recordFilterStrategy == null) ? false : true;
            if (this.retryTemplate != null) {
                MessageChannel errorChannel = getErrorChannel();
                if (this.recoveryCallback != null && errorChannel != null) {
                    this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy());
                }
                this.retryTemplate.registerListener(this.recordListener);
            }
            if (!z && this.recordFilterStrategy != null) {
                acknowledgingConsumerAwareMessageListener = new FilteringMessageListenerAdapter(acknowledgingConsumerAwareMessageListener, this.recordFilterStrategy, this.ackDiscarded);
            }
            containerProperties.setMessageListener(acknowledgingConsumerAwareMessageListener);
        } else {
            BatchAcknowledgingConsumerAwareMessageListener batchAcknowledgingConsumerAwareMessageListener = this.batchListener;
            if (this.recordFilterStrategy != null) {
                batchAcknowledgingConsumerAwareMessageListener = new FilteringBatchMessageListenerAdapter(batchAcknowledgingConsumerAwareMessageListener, this.recordFilterStrategy, this.ackDiscarded);
            }
            containerProperties.setMessageListener(batchAcknowledgingConsumerAwareMessageListener);
        }
        this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader();
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        this.messageListenerContainer.start();
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    @Override // org.springframework.integration.core.Pausable
    public void pause() {
        this.messageListenerContainer.pause();
    }

    @Override // org.springframework.integration.core.Pausable
    public void resume() {
        this.messageListenerContainer.resume();
    }

    @Override // org.springframework.integration.core.Pausable
    public boolean isPaused() {
        return this.messageListenerContainer.isContainerPaused();
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return getPhase();
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int afterShutdown() {
        return getPhase();
    }

    private void setAttributesIfNecessary(Object obj, @Nullable Message<?> message, boolean z) {
        AttributeAccessor attributeAccessor;
        boolean z2 = ATTRIBUTES_HOLDER.get() == null && getErrorChannel() != null && (this.retryTemplate == null || z);
        boolean z3 = z2 | (this.retryTemplate != null);
        if (z2) {
            ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (!z3 || (attributeAccessor = ATTRIBUTES_HOLDER.get()) == null) {
            return;
        }
        attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
        attributeAccessor.setAttribute(KafkaHeaders.RAW_DATA, obj);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport
    public AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = ATTRIBUTES_HOLDER.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }

    private void sendMessageIfAny(Message<?> message, Object obj) {
        if (message == null) {
            this.logger.debug(() -> {
                return "Converter returned a null message for: " + obj;
            });
            return;
        }
        try {
            sendMessage(message);
        } finally {
            if (this.retryTemplate == null) {
                ATTRIBUTES_HOLDER.remove();
            }
        }
    }
}
