package io.awspring.cloud.sqs.listener.acknowledgement;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.QueueAttributes;
import io.awspring.cloud.sqs.listener.QueueAttributesAware;
import io.awspring.cloud.sqs.listener.SqsAsyncClientAware;
import io.awspring.cloud.sqs.listener.SqsHeaders;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.class */
public class SqsAcknowledgementExecutor<T> implements AcknowledgementExecutor<T>, SqsAsyncClientAware, QueueAttributesAware {
    private static final Logger logger = LoggerFactory.getLogger(SqsAcknowledgementExecutor.class);
    private SqsAsyncClient sqsAsyncClient;
    private String queueUrl;
    private String queueName;

    @Override // io.awspring.cloud.sqs.listener.QueueAttributesAware
    public void setQueueAttributes(QueueAttributes queueAttributes) {
        Assert.notNull(queueAttributes, "queueAttributes cannot be null");
        this.queueUrl = queueAttributes.getQueueUrl();
        this.queueName = queueAttributes.getQueueName();
    }

    @Override // io.awspring.cloud.sqs.listener.SqsAsyncClientAware
    public void setSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
        Assert.notNull(sqsAsyncClient, "sqsAsyncClient cannot be null");
        this.sqsAsyncClient = sqsAsyncClient;
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementExecutor
    public CompletableFuture<Void> execute(Collection<Message<T>> collection) {
        try {
            logger.debug("Executing acknowledgement for {} messages", Integer.valueOf(collection.size()));
            Assert.notEmpty(collection, () -> {
                return "empty collection sent to acknowledge in queue " + this.queueName;
            });
            return deleteMessages(collection);
        } catch (Exception e) {
            return CompletableFutures.failedFuture(createAcknowledgementException(collection, e));
        }
    }

    private SqsAcknowledgementException createAcknowledgementException(Collection<Message<T>> collection, Throwable th) {
        return new SqsAcknowledgementException("Error acknowledging messages " + MessageHeaderUtils.getId(collection), collection, this.queueUrl, th);
    }

    private CompletableFuture<Void> deleteMessages(Collection<Message<T>> collection) {
        logger.trace("Acknowledging messages for queue {}: {}", this.queueName, MessageHeaderUtils.getId(collection));
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        return CompletableFutures.exceptionallyCompose(this.sqsAsyncClient.deleteMessageBatch(createDeleteMessageBatchRequest(collection)).thenRun(() -> {
        }), th -> {
            return CompletableFutures.failedFuture(createAcknowledgementException(collection, th));
        }).whenComplete((BiConsumer) (r8, th2) -> {
            logAckResult(collection, th2, stopWatch);
        });
    }

    private DeleteMessageBatchRequest createDeleteMessageBatchRequest(Collection<Message<T>> collection) {
        return (DeleteMessageBatchRequest) DeleteMessageBatchRequest.builder().queueUrl(this.queueUrl).entries((Collection) collection.stream().map(this::toDeleteMessageEntry).collect(Collectors.toList())).build();
    }

    private DeleteMessageBatchRequestEntry toDeleteMessageEntry(Message<T> message) {
        return (DeleteMessageBatchRequestEntry) DeleteMessageBatchRequestEntry.builder().receiptHandle(MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER)).id(UUID.randomUUID().toString()).build();
    }

    private void logAckResult(Collection<Message<T>> collection, Throwable th, StopWatch stopWatch) {
        stopWatch.stop();
        long totalTimeMillis = stopWatch.getTotalTimeMillis();
        if (totalTimeMillis > 10000) {
            logger.warn("Acknowledgement operation took {} seconds to finish in queue {} for messages {}", new Object[]{Long.valueOf(totalTimeMillis), this.queueName, MessageHeaderUtils.getId(collection)});
        }
        if (th == null) {
            logger.trace("Done acknowledging in queue {} messages: {} in {}ms", new Object[]{this.queueName, MessageHeaderUtils.getId(collection), Long.valueOf(totalTimeMillis)});
            return;
        }
        Logger logger2 = logger;
        Object[] objArr = new Object[4];
        objArr[0] = this.queueName;
        objArr[1] = MessageHeaderUtils.getId(collection);
        objArr[2] = Long.valueOf(totalTimeMillis);
        objArr[3] = th instanceof CompletionException ? th.getCause() : th;
        logger2.error("Error acknowledging in queue {} messages {} in {}ms", objArr);
    }
}
