package org.springframework.integration.amqp.outbound;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-6.0.2.jar:org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.class */
public class RabbitStreamMessageHandler extends AbstractMessageHandler {
    private static final int DEFAULT_CONFIRM_TIMEOUT = 10000;
    private final RabbitStreamOperations streamOperations;
    private boolean sync;
    private MessageChannel sendFailureChannel;
    private String sendFailureChannelName;
    private MessageChannel sendSuccessChannel;
    private String sendSuccessChannelName;
    private boolean headersMappedLast;
    private final MessagingTemplate messagingTemplate = new MessagingTemplate();
    private long confirmTimeout = 10000;
    private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper();

    public RabbitStreamMessageHandler(RabbitStreamOperations rabbitStreamOperations) {
        Assert.notNull(rabbitStreamOperations, "'streamOperations' cannot be null");
        this.streamOperations = rabbitStreamOperations;
    }

    public void setSendFailureChannel(MessageChannel messageChannel) {
        this.sendFailureChannel = messageChannel;
    }

    public void setSendFailureChannelName(String str) {
        this.sendFailureChannelName = str;
    }

    public void setSendSuccessChannel(MessageChannel messageChannel) {
        this.sendSuccessChannel = messageChannel;
    }

    public void setSendSuccessChannelName(String str) {
        this.sendSuccessChannelName = str;
    }

    public void setSync(boolean z) {
        this.sync = z;
    }

    public void setConfirmTimeout(long j) {
        this.confirmTimeout = j;
    }

    public void setHeaderMapper(AmqpHeaderMapper amqpHeaderMapper) {
        Assert.notNull(amqpHeaderMapper, "headerMapper must not be null");
        this.headerMapper = amqpHeaderMapper;
    }

    public void setHeadersMappedLast(boolean z) {
        this.headersMappedLast = z;
    }

    public RabbitStreamOperations getStreamOperations() {
        return this.streamOperations;
    }

    protected MessageChannel getSendFailureChannel() {
        if (this.sendFailureChannel != null) {
            return this.sendFailureChannel;
        }
        if (this.sendFailureChannelName == null) {
            return null;
        }
        this.sendFailureChannel = getChannelResolver().resolveDestination(this.sendFailureChannelName);
        return this.sendFailureChannel;
    }

    protected MessageChannel getSendSuccessChannel() {
        if (this.sendSuccessChannel != null) {
            return this.sendSuccessChannel;
        }
        if (this.sendSuccessChannelName == null) {
            return null;
        }
        this.sendSuccessChannel = getChannelResolver().resolveDestination(this.sendSuccessChannelName);
        return this.sendSuccessChannel;
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        com.rabbitmq.stream.Message fromMessage;
        if (message.getPayload() instanceof com.rabbitmq.stream.Message) {
            fromMessage = (com.rabbitmq.stream.Message) message.getPayload();
        } else {
            fromMessage = this.streamOperations.streamMessageConverter().fromMessage(mapMessage(message, this.streamOperations.messageConverter(), this.headerMapper, this.headersMappedLast));
        }
        handleConfirms(message, this.streamOperations.send(fromMessage));
    }

    private void handleConfirms(Message<?> message, CompletableFuture<Boolean> completableFuture) {
        completableFuture.whenComplete((bool, th) -> {
            if (th != null) {
                MessageChannel sendFailureChannel = getSendFailureChannel();
                if (sendFailureChannel != null) {
                    this.messagingTemplate.send((MessagingTemplate) sendFailureChannel, (Message<?>) new ErrorMessage(th, (Message<?>) message));
                    return;
                }
                return;
            }
            MessageChannel sendSuccessChannel = getSendSuccessChannel();
            if (sendSuccessChannel != null) {
                this.messagingTemplate.send((MessagingTemplate) sendSuccessChannel, (Message<?>) message);
            }
        });
        if (this.sync) {
            try {
                completableFuture.get(this.confirmTimeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new MessageHandlingException(message, e);
            } catch (ExecutionException | TimeoutException e2) {
                throw new MessageHandlingException(message, e2);
            }
        }
    }

    private static org.springframework.amqp.core.Message mapMessage(Message<?> message, MessageConverter messageConverter, AmqpHeaderMapper amqpHeaderMapper, boolean z) {
        return MappingUtils.mapMessage(message, messageConverter, amqpHeaderMapper, z, z, new StreamMessageProperties());
    }
}
