package io.awspring.cloud.sqs.listener.pipeline;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementHandler;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/pipeline/AcknowledgementHandlerExecutionStage.class */
public class AcknowledgementHandlerExecutionStage<T> implements MessageProcessingPipeline<T> {
    private static final Logger logger = LoggerFactory.getLogger(AcknowledgementHandlerExecutionStage.class);
    private final AcknowledgementHandler<T> acknowledgementHandler;

    public AcknowledgementHandlerExecutionStage(MessageProcessingConfiguration<T> messageProcessingConfiguration) {
        this.acknowledgementHandler = messageProcessingConfiguration.getAckHandler();
    }

    @Override // io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline
    public CompletableFuture<Message<T>> process(CompletableFuture<Message<T>> completableFuture, MessageProcessingContext<T> messageProcessingContext) {
        return CompletableFutures.handleCompose(completableFuture, (message, th) -> {
            return th == null ? this.acknowledgementHandler.onSuccess(message, messageProcessingContext.getAcknowledgmentCallback()).thenApply(r3 -> {
                return message;
            }) : this.acknowledgementHandler.onError(ListenerExecutionFailedException.unwrapMessage(th), th, messageProcessingContext.getAcknowledgmentCallback()).thenCompose(r32 -> {
                return CompletableFutures.failedFuture(th);
            });
        });
    }

    @Override // io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline
    public CompletableFuture<Collection<Message<T>>> processMany(CompletableFuture<Collection<Message<T>>> completableFuture, MessageProcessingContext<T> messageProcessingContext) {
        return CompletableFutures.handleCompose(completableFuture, (collection, th) -> {
            return th == null ? this.acknowledgementHandler.onSuccess(collection, messageProcessingContext.getAcknowledgmentCallback()).thenApply(r3 -> {
                return collection;
            }) : this.acknowledgementHandler.onError(ListenerExecutionFailedException.unwrapMessages(th), th, messageProcessingContext.getAcknowledgmentCallback()).thenCompose(r32 -> {
                return CompletableFutures.failedFuture(th);
            });
        });
    }
}
