package org.springframework.amqp.rabbit.listener.adapter;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.Expression;
import org.springframework.expression.ParserContext;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-rabbit-3.0.6.jar:org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.class */
public abstract class AbstractAdaptableMessageListener implements ChannelAwareMessageListener {
    private static final String DEFAULT_RESPONSE_ROUTING_KEY = "";
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final SpelExpressionParser PARSER = new SpelExpressionParser();
    private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
    static final boolean monoPresent = ClassUtils.isPresent("reactor.core.publisher.Mono", ChannelAwareMessageListener.class.getClassLoader());
    private Expression responseExpression;
    private boolean mandatoryPublish;
    private MessagePostProcessor[] beforeSendReplyPostProcessors;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private boolean isManualAck;
    private ReplyPostProcessor replyPostProcessor;
    private String replyContentType;
    protected final Log logger = LogFactory.getLog(getClass());
    private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
    private String responseRoutingKey = "";
    private String responseExchange = null;
    private Address responseAddress = null;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    private String encoding = "UTF-8";
    private boolean defaultRequeueRejected = true;
    private boolean converterWinsContentType = true;

    /* loaded from: input_file:BOOT-INF/lib/spring-rabbit-3.0.6.jar:org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener$ReplyExpressionRoot.class */
    public static final class ReplyExpressionRoot {
        private final Message request;
        private final Object source;
        private final Object result;

        protected ReplyExpressionRoot(Message message, Object obj, Object obj2) {
            this.request = message;
            this.source = obj;
            this.result = obj2;
        }

        public Message getRequest() {
            return this.request;
        }

        public Object getSource() {
            return this.source;
        }

        public Object getResult() {
            return this.result;
        }
    }

    public void setResponseRoutingKey(String str) {
        this.responseRoutingKey = str;
    }

    public void setEncoding(String str) {
        this.encoding = str;
    }

    public String getEncoding() {
        return this.encoding;
    }

    public void setResponseExchange(String str) {
        this.responseExchange = str;
    }

    public void setResponseAddress(String str) {
        if (str.startsWith(PARSER_CONTEXT.getExpressionPrefix())) {
            this.responseExpression = PARSER.parseExpression(str, PARSER_CONTEXT);
        } else {
            this.responseAddress = new Address(str);
        }
    }

    public void setMandatoryPublish(boolean z) {
        this.mandatoryPublish = z;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setBeforeSendReplyPostProcessors(MessagePostProcessor... messagePostProcessorArr) {
        Assert.noNullElements(messagePostProcessorArr, "'replyPostProcessors' must not have any null elements");
        this.beforeSendReplyPostProcessors = (MessagePostProcessor[]) Arrays.copyOf(messagePostProcessorArr, messagePostProcessorArr.length);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setBeanResolver(BeanResolver beanResolver) {
        this.evalContext.setBeanResolver(beanResolver);
        this.evalContext.setTypeConverter(new StandardTypeConverter());
        this.evalContext.addPropertyAccessor(new MapAccessor());
    }

    public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) {
        this.replyPostProcessor = replyPostProcessor;
    }

    protected String getReplyContentType() {
        return this.replyContentType;
    }

    public void setReplyContentType(String str) {
        this.replyContentType = str;
    }

    protected boolean isConverterWinsContentType() {
        return this.converterWinsContentType;
    }

    public void setConverterWinsContentType(boolean z) {
        this.converterWinsContentType = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setDefaultRequeueRejected(boolean z) {
        this.defaultRequeueRejected = z;
    }

    @Override // org.springframework.amqp.core.MessageListener
    public void containerAckMode(AcknowledgeMode acknowledgeMode) {
        this.isManualAck = AcknowledgeMode.MANUAL.equals(acknowledgeMode);
    }

    protected void handleListenerException(Throwable th) {
        this.logger.error("Listener execution failed", th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object extractMessage(Message message) {
        MessageConverter messageConverter = getMessageConverter();
        return messageConverter != null ? messageConverter.fromMessage(message) : message;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleResult(InvocationResult invocationResult, Message message, Channel channel) {
        handleResult(invocationResult, message, channel, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleResult(InvocationResult invocationResult, Message message, Channel channel, Object obj) {
        if (channel == null) {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("Listener method returned result [" + invocationResult + "]: not generating response message for it because no Rabbit Channel given");
                return;
            }
            return;
        }
        Object returnValue = invocationResult.getReturnValue();
        if (returnValue instanceof CompletableFuture) {
            CompletableFuture completableFuture = (CompletableFuture) returnValue;
            if (!this.isManualAck) {
                this.logger.warn("Container AcknowledgeMode must be MANUAL for a Future<?> return type; otherwise the container will ack the message immediately");
            }
            completableFuture.whenComplete((obj2, th) -> {
                if (th != null) {
                    asyncFailure(message, channel, th, obj);
                } else {
                    asyncSuccess(invocationResult, message, channel, obj, obj2);
                    basicAck(message, channel);
                }
            });
            return;
        }
        if (!monoPresent || !MonoHandler.isMono(invocationResult.getReturnValue())) {
            doHandleResult(invocationResult, message, channel, obj);
            return;
        }
        if (!this.isManualAck) {
            this.logger.warn("Container AcknowledgeMode must be MANUAL for a Mono<?> return type(or Kotlin suspend function); otherwise the container will ack the message immediately");
        }
        MonoHandler.subscribe(invocationResult.getReturnValue(), obj3 -> {
            asyncSuccess(invocationResult, message, channel, obj, obj3);
        }, th2 -> {
            asyncFailure(message, channel, th2, obj);
        }, () -> {
            basicAck(message, channel);
        });
    }

    private void asyncSuccess(InvocationResult invocationResult, Message message, Channel channel, Object obj, Object obj2) {
        if (obj2 == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Async result is null, ignoring");
                return;
            }
            return;
        }
        Type returnType = invocationResult.getReturnType();
        if (returnType != null) {
            Type[] actualTypeArguments = ((ParameterizedType) returnType).getActualTypeArguments();
            if (actualTypeArguments.length > 0) {
                returnType = actualTypeArguments[0];
                if (returnType instanceof WildcardType) {
                    returnType = null;
                }
            }
        }
        doHandleResult(new InvocationResult(obj2, invocationResult.getSendTo(), returnType, invocationResult.getBean(), invocationResult.getMethod()), message, channel, obj);
    }

    private void basicAck(Message message, Channel channel) {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            this.logger.error("Failed to ack message", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void asyncFailure(Message message, Channel channel, Throwable th, Object obj) {
        this.logger.error("Future, Mono, or suspend function was completed with an exception for " + message, th);
        try {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, ContainerUtils.shouldRequeue(this.defaultRequeueRejected, th, this.logger));
        } catch (IOException e) {
            this.logger.error("Failed to nack message", e);
        }
    }

    protected void doHandleResult(InvocationResult invocationResult, Message message, Channel channel, Object obj) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Listener method returned result [" + invocationResult + "] - generating response message for it");
        }
        try {
            Message buildMessage = buildMessage(channel, invocationResult.getReturnValue(), invocationResult.getReturnType());
            MessageProperties messageProperties = buildMessage.getMessageProperties();
            messageProperties.setTargetBean(invocationResult.getBean());
            messageProperties.setTargetMethod(invocationResult.getMethod());
            postProcessResponse(message, buildMessage);
            if (this.replyPostProcessor != null) {
                buildMessage = this.replyPostProcessor.apply(message, buildMessage);
            }
            sendResponse(channel, getReplyToAddress(message, obj, invocationResult), buildMessage);
        } catch (Exception e) {
            throw new ReplyFailureException("Failed to send reply with payload '" + invocationResult + "'", e);
        }
    }

    protected String getReceivedExchange(Message message) {
        return message.getMessageProperties().getReceivedExchange();
    }

    protected Message buildMessage(Channel channel, Object obj, Type type) {
        MessageConverter messageConverter = getMessageConverter();
        if (messageConverter != null && !(obj instanceof Message)) {
            return convert(obj, type, messageConverter);
        }
        if (obj instanceof Message) {
            return (Message) obj;
        }
        throw new MessageConversionException("No MessageConverter specified - cannot handle message [" + obj + "]");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message convert(Object obj, Type type, MessageConverter messageConverter) {
        MessageProperties messageProperties = new MessageProperties();
        if (this.replyContentType != null) {
            messageProperties.setContentType(this.replyContentType);
        }
        Message message = messageConverter.toMessage(obj, messageProperties, type);
        if (this.replyContentType != null && !this.converterWinsContentType) {
            message.getMessageProperties().setContentType(this.replyContentType);
        }
        return message;
    }

    protected void postProcessResponse(Message message, Message message2) {
        String messageId;
        String correlationId = message.getMessageProperties().getCorrelationId();
        if (correlationId == null && (messageId = message.getMessageProperties().getMessageId()) != null) {
            correlationId = messageId;
        }
        message2.getMessageProperties().setCorrelationId(correlationId);
    }

    protected Address getReplyToAddress(Message message, Object obj, InvocationResult invocationResult) {
        Address replyToAddress = message.getMessageProperties().getReplyToAddress();
        if (replyToAddress == null) {
            if (this.responseAddress == null && this.responseExchange != null) {
                this.responseAddress = new Address(this.responseExchange, this.responseRoutingKey);
            }
            if (invocationResult.getSendTo() != null) {
                replyToAddress = evaluateReplyTo(message, obj, invocationResult.getReturnValue(), invocationResult.getSendTo());
            } else if (this.responseExpression != null) {
                replyToAddress = evaluateReplyTo(message, obj, invocationResult.getReturnValue(), this.responseExpression);
            } else {
                if (this.responseAddress == null) {
                    throw new AmqpException("Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.");
                }
                replyToAddress = this.responseAddress;
            }
        }
        return replyToAddress;
    }

    private Address evaluateReplyTo(Message message, Object obj, Object obj2, Expression expression) {
        Object value = expression.getValue(this.evalContext, new ReplyExpressionRoot(message, obj, obj2));
        Assert.state((value instanceof String) || (value instanceof Address), "response expression must evaluate to a String or Address");
        return value instanceof String ? new Address((String) value) : (Address) value;
    }

    protected void sendResponse(Channel channel, Address address, Message message) {
        Message message2 = message;
        if (this.beforeSendReplyPostProcessors != null) {
            for (MessagePostProcessor messagePostProcessor : this.beforeSendReplyPostProcessors) {
                message2 = messagePostProcessor.postProcessMessage(message2);
            }
        }
        postProcessChannel(channel, message2);
        try {
            this.logger.debug("Publishing response to exchange = [" + address.getExchangeName() + "], routingKey = [" + address.getRoutingKey() + "]");
            if (this.retryTemplate == null) {
                doPublish(channel, address, message2);
            } else {
                Message message3 = message2;
                this.retryTemplate.execute(retryContext -> {
                    doPublish(channel, address, message3);
                    return null;
                }, retryContext2 -> {
                    if (this.recoveryCallback == null) {
                        throw RabbitExceptionTranslator.convertRabbitAccessException(retryContext2.getLastThrowable());
                    }
                    retryContext2.setAttribute("message", message3);
                    retryContext2.setAttribute("address", address);
                    this.recoveryCallback.recover(retryContext2);
                    return null;
                });
            }
        } catch (Exception e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
    }

    protected void doPublish(Channel channel, Address address, Message message) throws IOException {
        channel.basicPublish(address.getExchangeName(), address.getRoutingKey(), this.mandatoryPublish, this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding), message.getBody());
    }

    protected void postProcessChannel(Channel channel, Message message) {
    }
}
