package io.awspring.cloud.sqs.operations;

import io.awspring.cloud.sqs.ExceptionUtils;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import io.awspring.cloud.sqs.operations.SendResult;
import io.awspring.cloud.sqs.support.converter.ContextAwareMessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.MessageConversionContext;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.observation.AbstractTemplateObservation;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationConvention;
import io.micrometer.observation.ObservationRegistry;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

/* loaded from: input_file:io/awspring/cloud/sqs/operations/AbstractMessagingTemplate.class */
public abstract class AbstractMessagingTemplate<S> implements MessagingOperations, AsyncMessagingOperations {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingTemplate.class);
    private static final TemplateAcknowledgementMode DEFAULT_ACKNOWLEDGEMENT_MODE = TemplateAcknowledgementMode.ACKNOWLEDGE;
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10);
    private static final SendBatchFailureHandlingStrategy DEFAULT_SEND_BATCH_OPERATION_FAILURE_STRATEGY = SendBatchFailureHandlingStrategy.THROW;
    private static final ObservationRegistry DEFAULT_OBSERVATION_REGISTRY = ObservationRegistry.NOOP;
    private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;
    private static final String DEFAULT_ENDPOINT_NAME = "";
    private static final int MAX_ONE_MESSAGE = 1;
    private final Map<String, Object> defaultAdditionalHeaders;
    private final Duration defaultPollTimeout;
    private final int defaultMaxNumberOfMessages;
    private final String defaultEndpointName;
    private final TemplateAcknowledgementMode acknowledgementMode;
    private final SendBatchFailureHandlingStrategy sendBatchFailureHandlingStrategy;
    private final AbstractTemplateObservation.Specifics<?> observationSpecifics;
    private final ObservationRegistry observationRegistry;

    @Nullable
    private final ObservationConvention<?> customObservationConvention;

    @Nullable
    private final Class<?> defaultPayloadClass;
    private final MessagingMessageConverter<S> messageConverter;

    /* loaded from: input_file:io/awspring/cloud/sqs/operations/AbstractMessagingTemplate$AbstractMessagingTemplateOptions.class */
    protected static abstract class AbstractMessagingTemplateOptions<O extends MessagingTemplateOptions<O>> implements MessagingTemplateOptions<O> {
        private Duration defaultPollTimeout = AbstractMessagingTemplate.DEFAULT_POLL_TIMEOUT;
        private int defaultMaxNumberOfMessages = AbstractMessagingTemplate.DEFAULT_MAX_NUMBER_OF_MESSAGES;
        private String defaultEndpointName = AbstractMessagingTemplate.DEFAULT_ENDPOINT_NAME;
        private TemplateAcknowledgementMode acknowledgementMode = AbstractMessagingTemplate.DEFAULT_ACKNOWLEDGEMENT_MODE;
        private SendBatchFailureHandlingStrategy sendBatchFailureHandlingStrategy = AbstractMessagingTemplate.DEFAULT_SEND_BATCH_OPERATION_FAILURE_STRATEGY;
        private final Map<String, Object> defaultAdditionalHeaders = new HashMap();
        private ObservationRegistry observationRegistry = AbstractMessagingTemplate.DEFAULT_OBSERVATION_REGISTRY;

        @Nullable
        private ObservationConvention<?> observationConvention;

        @Nullable
        private Class<?> defaultPayloadClass;

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O acknowledgementMode(TemplateAcknowledgementMode templateAcknowledgementMode) {
            Assert.notNull(templateAcknowledgementMode, "defaultAcknowledgementMode must not be null");
            this.acknowledgementMode = templateAcknowledgementMode;
            return self();
        }

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O sendBatchFailureHandlingStrategy(SendBatchFailureHandlingStrategy sendBatchFailureHandlingStrategy) {
            Assert.notNull(sendBatchFailureHandlingStrategy, "sendBatchFailureStrategy must not be null");
            this.sendBatchFailureHandlingStrategy = sendBatchFailureHandlingStrategy;
            return self();
        }

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O defaultPollTimeout(Duration duration) {
            Assert.notNull(duration, "pollTimeout must not be null");
            this.defaultPollTimeout = duration;
            return self();
        }

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O defaultMaxNumberOfMessages(Integer num) {
            Assert.isTrue(num.intValue() > 0, "defaultMaxNumberOfMessages must be greater than zero");
            this.defaultMaxNumberOfMessages = num.intValue();
            return self();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void defaultEndpointName(String str) {
            Assert.notNull(str, "defaultEndpointName must not be null");
            this.defaultEndpointName = str;
        }

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O defaultPayloadClass(Class<?> cls) {
            Assert.notNull(cls, "defaultPayloadClass must not be null");
            this.defaultPayloadClass = cls;
            return self();
        }

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O additionalHeaderForReceive(String str, Object obj) {
            Assert.notNull(str, "name must not be null");
            Assert.notNull(obj, "value must not be null");
            this.defaultAdditionalHeaders.put(str, obj);
            return self();
        }

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O additionalHeadersForReceive(Map<String, Object> map) {
            Assert.notNull(map, "defaultAdditionalHeaders must not be null");
            this.defaultAdditionalHeaders.putAll(map);
            return self();
        }

        @Override // io.awspring.cloud.sqs.operations.MessagingTemplateOptions
        public O observationRegistry(ObservationRegistry observationRegistry) {
            Assert.notNull(observationRegistry, "observationRegistry cannot be null");
            this.observationRegistry = observationRegistry;
            return self();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public O observationConvention(ObservationConvention<?> observationConvention) {
            Assert.notNull(observationConvention, "observationConvention cannot be null");
            this.observationConvention = observationConvention;
            return self();
        }

        public O self() {
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMessagingTemplate(MessagingMessageConverter<S> messagingMessageConverter, AbstractMessagingTemplateOptions<?> abstractMessagingTemplateOptions, AbstractTemplateObservation.Specifics<?> specifics) {
        Assert.notNull(specifics, "observationSpecifics must not be null");
        Assert.notNull(messagingMessageConverter, "messageConverter must not be null");
        Assert.notNull(abstractMessagingTemplateOptions, "options must not be null");
        this.messageConverter = messagingMessageConverter;
        this.observationSpecifics = specifics;
        this.defaultAdditionalHeaders = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).defaultAdditionalHeaders;
        this.defaultMaxNumberOfMessages = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).defaultMaxNumberOfMessages;
        this.defaultPollTimeout = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).defaultPollTimeout;
        this.defaultPayloadClass = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).defaultPayloadClass;
        this.defaultEndpointName = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).defaultEndpointName;
        this.acknowledgementMode = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).acknowledgementMode;
        this.sendBatchFailureHandlingStrategy = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).sendBatchFailureHandlingStrategy;
        this.observationRegistry = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).observationRegistry;
        this.customObservationConvention = ((AbstractMessagingTemplateOptions) abstractMessagingTemplateOptions).observationConvention;
    }

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public Optional<Message<?>> receive() {
        return (Optional) unwrapCompletionException(receiveAsync());
    }

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public <T> Optional<Message<T>> receive(String str, Class<T> cls) {
        return (Optional) unwrapCompletionException(receiveAsync(str, cls));
    }

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public Collection<Message<?>> receiveMany() {
        return (Collection) unwrapCompletionException(receiveManyAsync());
    }

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public <T> Collection<Message<T>> receiveMany(String str, Class<T> cls) {
        Assert.notNull(str, "queue cannot be null");
        Assert.notNull(cls, "payloadClass cannot be null");
        return (Collection) unwrapCompletionException(receiveManyAsync(str, cls));
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public CompletableFuture<Optional<Message<?>>> receiveAsync() {
        return receiveAsync(null, null, null, null);
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public <T> CompletableFuture<Optional<Message<T>>> receiveAsync(String str, Class<T> cls) {
        Assert.notNull(str, "queue cannot be null");
        Assert.notNull(cls, "payloadClass cannot be null");
        return (CompletableFuture<Optional<Message<T>>>) receiveAsync(str, cls, null, null).thenApply(this::castFromOptional);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> Optional<Message<T>> castFromOptional(Optional<Message<?>> optional) {
        return (Optional<Message<T>>) optional.map(message -> {
            return message;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> List<Message<T>> castFromCollection(Collection<Message<?>> collection) {
        return collection.stream().map(message -> {
            return message;
        }).toList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Optional<Message<?>>> receiveAsync(@Nullable String str, @Nullable Class<?> cls, @Nullable Duration duration, @Nullable Map<String, Object> map) {
        return receiveManyAsync(str, cls, duration, Integer.valueOf(MAX_ONE_MESSAGE), map).thenApply(collection -> {
            return collection.isEmpty() ? Optional.empty() : Optional.of((Message) collection.iterator().next());
        });
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public CompletableFuture<Collection<Message<?>>> receiveManyAsync() {
        return receiveManyAsync(null, null, null, null, null);
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public <T> CompletableFuture<Collection<Message<T>>> receiveManyAsync(String str, Class<T> cls) {
        return (CompletableFuture<Collection<Message<T>>>) receiveManyAsync(str, cls, null, null, null).thenApply(this::castFromCollection);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Collection<Message<?>>> receiveManyAsync(@Nullable String str, @Nullable Class<?> cls, @Nullable Duration duration, @Nullable Integer num, @Nullable Map<String, Object> map) {
        String endpointName = getEndpointName(str);
        logger.trace("Receiving messages from endpoint {}", endpointName);
        Map<String, Object> additionalHeadersToReceive = getAdditionalHeadersToReceive(endpointName, map);
        return doReceiveAsync(endpointName, (Duration) getOrDefault(duration, this.defaultPollTimeout, "pollTimeout"), (Integer) getOrDefault(num, Integer.valueOf(this.defaultMaxNumberOfMessages), "defaultMaxNumberOfMessages"), additionalHeadersToReceive).thenApply(collection -> {
            return convertReceivedMessages(endpointName, cls, collection, additionalHeadersToReceive);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) collection2 -> {
            return handleAcknowledgement(endpointName, collection2);
        }).exceptionallyCompose(th -> {
            return CompletableFuture.failedFuture(new MessagingOperationFailedException("Message receive operation failed for endpoint %s".formatted(endpointName), endpointName, th instanceof CompletionException ? th.getCause() : th));
        }).whenComplete((collection3, th2) -> {
            logReceiveMessageResult(endpointName, collection3, th2);
        });
    }

    protected abstract Map<String, Object> preProcessHeadersForReceive(String str, Map<String, Object> map);

    private Map<String, Object> getAdditionalHeadersToReceive(String str, @Nullable Map<String, Object> map) {
        HashMap hashMap = new HashMap(this.defaultAdditionalHeaders);
        if (map != null) {
            hashMap.putAll(map);
        }
        return preProcessHeadersForReceive(str, hashMap);
    }

    private Collection<Message<?>> convertReceivedMessages(String str, @Nullable Class<?> cls, Collection<S> collection, Map<String, Object> map) {
        return (Collection) collection.stream().map(obj -> {
            return convertReceivedMessage(getEndpointName(str), obj, cls != null ? cls : this.defaultPayloadClass);
        }).map(message -> {
            return addAdditionalHeaders(message, map);
        }).collect(Collectors.toList());
    }

    protected <T> Message<T> addAdditionalHeaders(Message<T> message, Map<String, Object> map) {
        Map<String, Object> handleAdditionalHeaders = handleAdditionalHeaders(map);
        return handleAdditionalHeaders.isEmpty() ? message : MessageHeaderUtils.addHeadersIfAbsent(message, handleAdditionalHeaders);
    }

    protected abstract Map<String, Object> handleAdditionalHeaders(Map<String, Object> map);

    private CompletableFuture<Collection<Message<?>>> handleAcknowledgement(@Nullable String str, Collection<Message<?>> collection) {
        return (!TemplateAcknowledgementMode.ACKNOWLEDGE.equals(this.acknowledgementMode) || collection.isEmpty()) ? CompletableFuture.completedFuture(collection) : doAcknowledgeMessages(getEndpointName(str), collection).thenApply(r3 -> {
            return collection;
        });
    }

    protected abstract CompletableFuture<Void> doAcknowledgeMessages(String str, Collection<Message<?>> collection);

    private String getEndpointName(@Nullable String str) {
        String str2 = (String) getOrDefault(str, this.defaultEndpointName, "endpointName");
        Assert.hasText(str2, "No endpoint name informed and no default value available");
        return str2;
    }

    private <V> V getOrDefault(@Nullable V v, V v2, String str) {
        return (V) Objects.requireNonNull(v != null ? v : v2, str + " not set and no default value provided");
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Message<?> convertReceivedMessage(String str, S s, @Nullable Class<?> cls) {
        MessagingMessageConverter<S> messagingMessageConverter = this.messageConverter;
        return messagingMessageConverter instanceof ContextAwareMessagingMessageConverter ? ((ContextAwareMessagingMessageConverter) messagingMessageConverter).toMessagingMessage(s, getReceiveMessageConversionContext(str, cls)) : this.messageConverter.toMessagingMessage(s);
    }

    private void logReceiveMessageResult(String str, @Nullable Collection<Message<?>> collection, @Nullable Throwable th) {
        if (collection != null) {
            logger.trace("Received messages {} from endpoint {}", MessageHeaderUtils.getId(addTypeToMessages(collection)), str);
        } else {
            logger.error("Error receiving messages", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<Message<Object>> addTypeToMessages(Collection<Message<?>> collection) {
        return (Collection) collection.stream().map(message -> {
            return message;
        }).collect(Collectors.toList());
    }

    protected abstract CompletableFuture<Collection<S>> doReceiveAsync(String str, Duration duration, Integer num, Map<String, Object> map);

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public <T> SendResult<T> send(T t) {
        return (SendResult) unwrapCompletionException(sendAsync(t));
    }

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public <T> SendResult<T> send(@Nullable String str, T t) {
        return (SendResult) unwrapCompletionException(sendAsync(str, (String) t));
    }

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public <T> SendResult<T> send(@Nullable String str, Message<T> message) {
        return (SendResult) unwrapCompletionException(sendAsync(str, (Message) message));
    }

    @Override // io.awspring.cloud.sqs.operations.MessagingOperations
    public <T> SendResult.Batch<T> sendMany(@Nullable String str, Collection<Message<T>> collection) {
        return (SendResult.Batch) unwrapCompletionException(sendManyAsync(str, collection));
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public <T> CompletableFuture<SendResult<T>> sendAsync(T t) {
        return sendAsync((String) null, (String) t);
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String str, T t) {
        return sendAsync(str, (Message) (t instanceof Message ? (Message) t : MessageBuilder.withPayload(t).build()));
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String str, Message<T> message) {
        String endpointName = getEndpointName(str);
        logger.trace("Sending message {} to endpoint {}", MessageHeaderUtils.getId((Message<?>) message), str);
        return (CompletableFuture<SendResult<T>>) preProcessMessageForSendAsync(endpointName, message).thenCompose(message2 -> {
            return observeAndSendAsync(message2, endpointName).exceptionallyCompose((Function<Throwable, ? extends CompletionStage<SendResult<T>>>) th -> {
                return CompletableFuture.failedFuture(new MessagingOperationFailedException("Message send operation failed for message %s to endpoint %s".formatted(MessageHeaderUtils.getId((Message<?>) message), endpointName), endpointName, (Message<?>) message, th));
            }).whenComplete((sendResult, th2) -> {
                logSendMessageResult(endpointName, message, th2);
            });
        });
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [io.awspring.cloud.sqs.support.observation.AbstractTemplateObservation$Context, io.micrometer.observation.Observation$Context] */
    private <T> CompletableFuture<SendResult<T>> observeAndSendAsync(Message<T> message, String str) {
        ?? createContext = this.observationSpecifics.createContext(message, str);
        Observation startObservation = startObservation(createContext);
        Message<T> addHeadersIfAbsent = MessageHeaderUtils.addHeadersIfAbsent(message, (Map<String, Object>) Objects.requireNonNull((Map) createContext.getCarrier(), "No carrier found in context."));
        return doSendAsync(str, convertMessageToSend(addHeadersIfAbsent), addHeadersIfAbsent).whenComplete((BiConsumer<? super SendResult<T>, ? super Throwable>) (sendResult, th) -> {
            completeObservation(sendResult, createContext, th, startObservation);
        });
    }

    private void completeObservation(@Nullable SendResult<?> sendResult, AbstractTemplateObservation.Context context, @Nullable Throwable th, Observation observation) {
        if (sendResult != null) {
            context.setSendResult(sendResult);
        }
        if (th != null) {
            observation.error(ExceptionUtils.unwrapException(th, CompletionException.class, AsyncAdapterBlockingExecutionFailedException.class, ListenerExecutionFailedException.class));
        }
        observation.stop();
    }

    private <Context extends Observation.Context> Observation startObservation(Context context) {
        ObservationConvention<?> defaultConvention = this.observationSpecifics.getDefaultConvention();
        return this.observationSpecifics.getDocumentation().start(this.customObservationConvention, defaultConvention, () -> {
            return context;
        }, this.observationRegistry);
    }

    protected abstract <T> Message<T> preProcessMessageForSend(String str, Message<T> message);

    protected <T> CompletableFuture<Message<T>> preProcessMessageForSendAsync(String str, Message<T> message) {
        return CompletableFuture.completedFuture(preProcessMessageForSend(str, message));
    }

    @Override // io.awspring.cloud.sqs.operations.AsyncMessagingOperations
    public <T> CompletableFuture<SendResult.Batch<T>> sendManyAsync(@Nullable String str, Collection<Message<T>> collection) {
        logger.trace("Sending messages {} to endpoint {}", MessageHeaderUtils.getId(collection), str);
        String endpointName = getEndpointName(str);
        return (CompletableFuture<SendResult.Batch<T>>) preProcessMessagesForSendAsync(endpointName, collection).thenCompose(collection2 -> {
            return doSendBatchAsync(endpointName, convertMessagesToSend(collection2), collection2).exceptionallyCompose((Function<Throwable, ? extends CompletionStage<SendResult.Batch<T>>>) th -> {
                return wrapSendException(collection2, endpointName, th);
            }).thenCompose(batch -> {
                return handleFailedMessages(endpointName, batch);
            }).whenComplete((batch2, th2) -> {
                logSendMessageBatchResult(endpointName, collection2, th2);
            });
        });
    }

    protected abstract <T> Collection<Message<T>> preProcessMessagesForSend(String str, Collection<Message<T>> collection);

    protected <T> CompletableFuture<Collection<Message<T>>> preProcessMessagesForSendAsync(String str, Collection<Message<T>> collection) {
        return CompletableFuture.completedFuture(preProcessMessagesForSend(str, collection));
    }

    private <T> CompletableFuture<SendResult.Batch<T>> handleFailedMessages(String str, SendResult.Batch<T> batch) {
        return (batch.failed().isEmpty() || !SendBatchFailureHandlingStrategy.THROW.equals(this.sendBatchFailureHandlingStrategy)) ? CompletableFuture.completedFuture(batch) : handleFailedSendBatch(str, batch);
    }

    private <T> CompletableFuture<SendResult.Batch<T>> wrapSendException(Collection<Message<T>> collection, String str, Throwable th) {
        return CompletableFuture.failedFuture(new MessagingOperationFailedException("Message send operation failed for messages %s to endpoint %s".formatted(MessageHeaderUtils.getId(collection), str), str, collection, th));
    }

    private <T> CompletableFuture<SendResult.Batch<T>> handleFailedSendBatch(String str, SendResult.Batch<T> batch) {
        return CompletableFuture.failedFuture(new SendBatchOperationFailedException(DEFAULT_ENDPOINT_NAME, str, batch));
    }

    private <T> Collection<S> convertMessagesToSend(Collection<Message<T>> collection) {
        return (Collection) collection.stream().map(this::convertMessageToSend).collect(Collectors.toList());
    }

    private <T> S convertMessageToSend(Message<T> message) {
        MessagingMessageConverter<S> messagingMessageConverter = this.messageConverter;
        return messagingMessageConverter instanceof ContextAwareMessagingMessageConverter ? (S) ((ContextAwareMessagingMessageConverter) messagingMessageConverter).fromMessagingMessage(message, getSendMessageConversionContext(message)) : this.messageConverter.fromMessagingMessage(message);
    }

    protected abstract <T> CompletableFuture<SendResult<T>> doSendAsync(String str, S s, Message<T> message);

    protected abstract <T> CompletableFuture<SendResult.Batch<T>> doSendBatchAsync(String str, Collection<S> collection, Collection<Message<T>> collection2);

    @Nullable
    protected <T> MessageConversionContext getReceiveMessageConversionContext(String str, @Nullable Class<T> cls) {
        return null;
    }

    @Nullable
    protected <T> MessageConversionContext getSendMessageConversionContext(Message<T> message) {
        return null;
    }

    private <T> void logSendMessageResult(String str, Message<T> message, @Nullable Throwable th) {
        if (th == null) {
            logger.trace("Message {} successfully sent to endpoint {} with id {}", new Object[]{message, str, MessageHeaderUtils.getId((Message<?>) message)});
        } else {
            logger.error("Error sending message {} to endpoint {}", new Object[]{MessageHeaderUtils.getId((Message<?>) message), str, unwrapCompletionException(th)});
        }
    }

    private Throwable unwrapCompletionException(Throwable th) {
        return (!(th instanceof CompletionException) || th.getCause() == null) ? th : th.getCause();
    }

    private <T> void logSendMessageBatchResult(String str, Collection<Message<T>> collection, @Nullable Throwable th) {
        if (th == null) {
            logger.trace("Messages {} successfully sent to endpoint {} with id {}", new Object[]{collection, str, MessageHeaderUtils.getId(collection)});
        } else {
            logger.error("Error sending messages {} to endpoint {}", new Object[]{MessageHeaderUtils.getId(collection), str, unwrapCompletionException(th)});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <V> V unwrapCompletionException(CompletableFuture<V> completableFuture) {
        try {
            return completableFuture.join();
        } catch (CompletionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            throw new RuntimeException("Unexpected exception", e);
        }
    }
}
