package org.springframework.integration.amqp.outbound;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.Lifecycle;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-6.0.6.jar:org/springframework/integration/amqp/outbound/AmqpOutboundEndpoint.class */
public class AmqpOutboundEndpoint extends AbstractAmqpOutboundEndpoint implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    private static final Duration DEFAULT_CONFIRM_TIMEOUT = Duration.ofSeconds(5);
    private final AmqpTemplate amqpTemplate;
    private final RabbitTemplate rabbitTemplate;
    private boolean expectReply;
    private boolean waitForConfirm;
    private Duration waitForConfirmTimeout = DEFAULT_CONFIRM_TIMEOUT;
    private boolean multiSend;

    public AmqpOutboundEndpoint(AmqpTemplate amqpTemplate) {
        Assert.notNull(amqpTemplate, "amqpTemplate must not be null");
        this.amqpTemplate = amqpTemplate;
        if (!(amqpTemplate instanceof RabbitTemplate)) {
            this.rabbitTemplate = null;
        } else {
            setConnectionFactory(((RabbitTemplate) amqpTemplate).getConnectionFactory());
            this.rabbitTemplate = (RabbitTemplate) amqpTemplate;
        }
    }

    public void setExpectReply(boolean z) {
        this.expectReply = z;
    }

    public void setWaitForConfirm(boolean z) {
        this.waitForConfirm = z;
    }

    @Override // org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return this.expectReply ? "amqp:outbound-gateway" : "amqp:outbound-channel-adapter";
    }

    public void setMultiSend(boolean z) {
        Assert.isTrue(this.rabbitTemplate != null && (!this.waitForConfirm || this.rabbitTemplate.getConnectionFactory().isSimplePublisherConfirms()), (Supplier<String>) () -> {
            return "To use multiSend, " + this.amqpTemplate + " must be a RabbitTemplate with a ConnectionFactory configured with simple confirms";
        });
        this.multiSend = z;
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler, org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.IntegrationPattern
    public IntegrationPatternType getIntegrationPatternType() {
        return this.expectReply ? super.getIntegrationPatternType() : IntegrationPatternType.outbound_channel_adapter;
    }

    @Override // org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint
    public RabbitTemplate getRabbitTemplate() {
        return this.rabbitTemplate;
    }

    @Override // org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint
    protected void endpointInit() {
        if (getConfirmCorrelationExpression() != null) {
            Assert.notNull(this.rabbitTemplate, "RabbitTemplate implementation is required for publisher confirms");
            this.rabbitTemplate.setConfirmCallback(this);
            if (!this.rabbitTemplate.getConnectionFactory().isPublisherConfirms()) {
                this.logger.warn("A confirm correlation expression is provided but the underlying connection factory does not support correlated delivery confirmations; no confirmations will be received");
            }
        }
        if (getReturnChannel() != null) {
            Assert.notNull(this.rabbitTemplate, "RabbitTemplate implementation is required for publisher confirms");
            this.rabbitTemplate.setReturnsCallback(this);
            if (!this.rabbitTemplate.getConnectionFactory().isPublisherReturns()) {
                this.logger.warn("A return channel is provided but the underlying connection factory does not support returned messages; none will be received");
            }
        }
        Duration confirmTimeout = getConfirmTimeout();
        if (confirmTimeout != null) {
            this.waitForConfirmTimeout = confirmTimeout;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint
    public void doStop() {
        if (this.amqpTemplate instanceof Lifecycle) {
            ((Lifecycle) this.amqpTemplate).stop();
        }
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected Object handleRequestMessage(Message<?> message) {
        CorrelationData generateCorrelationData = generateCorrelationData(message);
        String generateExchangeName = generateExchangeName(message);
        String generateRoutingKey = generateRoutingKey(message);
        if (this.expectReply) {
            return sendAndReceive(generateExchangeName, generateRoutingKey, message, generateCorrelationData);
        }
        if (this.multiSend && (message.getPayload() instanceof Iterable)) {
            multiSend(message, generateExchangeName, generateRoutingKey);
            return null;
        }
        send(generateExchangeName, generateRoutingKey, message, generateCorrelationData);
        if (!this.waitForConfirm || generateCorrelationData == null) {
            return null;
        }
        waitForConfirm(message, generateCorrelationData);
        return null;
    }

    private void multiSend(Message<?> message, String str, String str2) {
        ((Iterable) message.getPayload()).forEach(obj -> {
            Assert.state(obj instanceof Message, "To use multiSend, the payload must be an Iterable<Message<?>>");
        });
        this.rabbitTemplate.invoke(rabbitOperations -> {
            ((Iterable) message.getPayload()).forEach(message2 -> {
                doRabbitSend(str, str2, message2, null, (RabbitTemplate) rabbitOperations);
            });
            if (!this.waitForConfirm) {
                return null;
            }
            rabbitOperations.waitForConfirmsOrDie(this.waitForConfirmTimeout.toMillis());
            return null;
        });
    }

    private void waitForConfirm(Message<?> message, CorrelationData correlationData) {
        try {
            CorrelationData.Confirm confirm = correlationData.getFuture().get(this.waitForConfirmTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if (!confirm.isAck()) {
                throw new AmqpException("Negative publisher confirm received: " + confirm);
            }
            if (correlationData.getReturned() != null) {
                throw new AmqpException("Message was returned by the broker");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            throw new AmqpException("Failed to get publisher confirm", e2);
        } catch (TimeoutException e3) {
            throw new MessageTimeoutException(message, this + ": Timed out awaiting publisher confirm", e3);
        }
    }

    private void send(String str, String str2, Message<?> message, CorrelationData correlationData) {
        if (this.rabbitTemplate != null) {
            doRabbitSend(str, str2, message, correlationData, this.rabbitTemplate);
        } else {
            this.amqpTemplate.convertAndSend(str, str2, message.getPayload(), message2 -> {
                getHeaderMapper().fromHeadersToRequest(message.getHeaders(), message2.getMessageProperties());
                return message2;
            });
        }
    }

    private void doRabbitSend(String str, String str2, Message<?> message, CorrelationData correlationData, RabbitTemplate rabbitTemplate) {
        org.springframework.amqp.core.Message mapMessage = MappingUtils.mapMessage(message, rabbitTemplate.getMessageConverter(), getHeaderMapper(), getDefaultDeliveryMode(), isHeadersMappedLast());
        addDelayProperty(message, mapMessage);
        rabbitTemplate.send(str, str2, mapMessage, correlationData);
    }

    private AbstractIntegrationMessageBuilder<?> sendAndReceive(String str, String str2, Message<?> message, CorrelationData correlationData) {
        Assert.state(this.rabbitTemplate != null, "RabbitTemplate implementation is required for publisher confirms");
        MessageConverter messageConverter = this.rabbitTemplate.getMessageConverter();
        org.springframework.amqp.core.Message mapMessage = MappingUtils.mapMessage(message, messageConverter, getHeaderMapper(), getDefaultDeliveryMode(), isHeadersMappedLast());
        addDelayProperty(message, mapMessage);
        org.springframework.amqp.core.Message sendAndReceive = this.rabbitTemplate.sendAndReceive(str, str2, mapMessage, correlationData);
        if (sendAndReceive == null) {
            return null;
        }
        return buildReply(messageConverter, sendAndReceive);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback
    public void confirm(CorrelationData correlationData, boolean z, String str) {
        handleConfirm(correlationData, z, str);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnsCallback
    public void returnedMessage(ReturnedMessage returnedMessage) {
        getReturnChannel().send(buildReturnedMessage(returnedMessage, this.rabbitTemplate.getMessageConverter()));
    }
}
