package io.awspring.cloud.sqs.listener.sink.adapter;

import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.SqsAsyncClientAware;
import io.awspring.cloud.sqs.listener.SqsHeaders;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityExtendingSinkAdapter.class */
public class MessageVisibilityExtendingSinkAdapter<T> extends AbstractDelegatingMessageListeningSinkAdapter<T> implements SqsAsyncClientAware {
    private static final Logger logger = LoggerFactory.getLogger(MessageVisibilityExtendingSinkAdapter.class);
    private static final Duration DEFAULT_VISIBILITY_TO_SET = Duration.ofSeconds(30);
    private int messageVisibility;
    private SqsAsyncClient sqsAsyncClient;

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityExtendingSinkAdapter$OriginalBatchMessageVisibilityExtendingInterceptor.class */
    private class OriginalBatchMessageVisibilityExtendingInterceptor implements AsyncMessageInterceptor<T> {
        private final Collection<Message<T>> originalMessageBatchCopy;
        private final int initialBatchSize;

        private OriginalBatchMessageVisibilityExtendingInterceptor(Collection<Message<T>> collection) {
            this.originalMessageBatchCopy = Collections.synchronizedCollection(new ArrayList(collection));
            this.initialBatchSize = collection.size();
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Message<T>> intercept(Message<T> message) {
            return this.originalMessageBatchCopy.size() == this.initialBatchSize ? CompletableFuture.completedFuture(message) : MessageVisibilityExtendingSinkAdapter.this.changeVisibility(this.originalMessageBatchCopy).thenApply(collection -> {
                return message;
            });
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Collection<Message<T>>> intercept(Collection<Message<T>> collection) {
            return this.originalMessageBatchCopy.size() == this.initialBatchSize ? CompletableFuture.completedFuture(collection) : MessageVisibilityExtendingSinkAdapter.this.changeVisibility(this.originalMessageBatchCopy).thenApply(collection2 -> {
                return collection;
            });
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Void> afterProcessing(Collection<Message<T>> collection, Throwable th) {
            this.originalMessageBatchCopy.removeAll(collection);
            return CompletableFuture.completedFuture(null);
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Void> afterProcessing(Message<T> message, Throwable th) {
            this.originalMessageBatchCopy.remove(message);
            return CompletableFuture.completedFuture(null);
        }
    }

    public MessageVisibilityExtendingSinkAdapter(MessageSink<T> messageSink) {
        super(messageSink);
        this.messageVisibility = (int) DEFAULT_VISIBILITY_TO_SET.getSeconds();
    }

    public void setMessageVisibility(Duration duration) {
        Assert.notNull(duration, "visibilityDuration cannot be null");
        this.messageVisibility = (int) duration.getSeconds();
    }

    @Override // io.awspring.cloud.sqs.listener.sink.adapter.AbstractDelegatingMessageListeningSinkAdapter, io.awspring.cloud.sqs.listener.SqsAsyncClientAware
    public void setSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
        Assert.notNull(sqsAsyncClient, "sqsAsyncClient cannot be null");
        super.setSqsAsyncClient(sqsAsyncClient);
        this.sqsAsyncClient = sqsAsyncClient;
    }

    @Override // io.awspring.cloud.sqs.listener.sink.MessageSink
    public CompletableFuture<Void> emit(Collection<Message<T>> collection, MessageProcessingContext<T> messageProcessingContext) {
        logger.trace("Adding visibility interceptor for messages {}", MessageHeaderUtils.getId(collection));
        return getDelegate().emit(collection, messageProcessingContext.addInterceptor(new OriginalBatchMessageVisibilityExtendingInterceptor(collection)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<Collection<Message<T>>> changeVisibility(Collection<Message<T>> collection) {
        logger.trace("Changing visibility of messages {} to {} seconds", MessageHeaderUtils.getId(collection), Integer.valueOf(this.messageVisibility));
        return (CompletableFuture<Collection<Message<T>>>) this.sqsAsyncClient.changeMessageVisibilityBatch(builder -> {
        }).whenComplete((BiConsumer) (changeMessageVisibilityBatchResponse, th) -> {
            logResult(collection, th);
        }).thenApply((Function) changeMessageVisibilityBatchResponse2 -> {
            return collection;
        });
    }

    private String getQueueUrl(Collection<Message<T>> collection) {
        return (String) collection.iterator().next().getHeaders().get(SqsHeaders.SQS_QUEUE_URL_HEADER, String.class);
    }

    private Collection<ChangeMessageVisibilityBatchRequestEntry> getEntries(Collection<Message<T>> collection) {
        return (Collection) MessageHeaderUtils.getHeader(collection, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, String.class).stream().map(str -> {
            return (ChangeMessageVisibilityBatchRequestEntry) ChangeMessageVisibilityBatchRequestEntry.builder().receiptHandle(str).id(UUID.randomUUID().toString()).visibilityTimeout(Integer.valueOf(this.messageVisibility)).build();
        }).collect(Collectors.toList());
    }

    private void logResult(Collection<Message<T>> collection, Throwable th) {
        if (th == null) {
            logger.trace("Finished changing visibility for messages {}", MessageHeaderUtils.getId(collection));
        } else {
            logger.error("Error changing visibility for messages {}", MessageHeaderUtils.getId(collection));
        }
    }
}
