package io.awspring.cloud.sqs.listener.pipeline;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/pipeline/ErrorHandlerExecutionStage.class */
public class ErrorHandlerExecutionStage<T> implements MessageProcessingPipeline<T> {
    private static final Logger logger = LoggerFactory.getLogger(ErrorHandlerExecutionStage.class);
    private final AsyncErrorHandler<T> errorHandler;

    public ErrorHandlerExecutionStage(MessageProcessingConfiguration<T> messageProcessingConfiguration) {
        this.errorHandler = messageProcessingConfiguration.getErrorHandler();
    }

    @Override // io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline
    public CompletableFuture<Message<T>> process(CompletableFuture<Message<T>> completableFuture, MessageProcessingContext<T> messageProcessingContext) {
        return this.errorHandler == null ? completableFuture : CompletableFutures.exceptionallyCompose(completableFuture, th -> {
            return handleError(ListenerExecutionFailedException.unwrapMessage(th), th);
        });
    }

    private CompletableFuture<Message<T>> handleError(Message<T> message, Throwable th) {
        logger.debug("Handling error {} for message {}", th, MessageHeaderUtils.getId((Message<?>) message));
        return CompletableFutures.exceptionallyCompose(this.errorHandler.handle(message, th).thenApply(r3 -> {
            return message;
        }), th2 -> {
            return CompletableFutures.failedFuture(maybeWrap(message, th2));
        });
    }

    private Throwable maybeWrap(Message<T> message, Throwable th) {
        return ListenerExecutionFailedException.hasListenerException(th) ? th : new ListenerExecutionFailedException("Error handler returned an exception", th, (Message<?>) message);
    }

    @Override // io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline
    public CompletableFuture<Collection<Message<T>>> processMany(CompletableFuture<Collection<Message<T>>> completableFuture, MessageProcessingContext<T> messageProcessingContext) {
        return this.errorHandler == null ? completableFuture : CompletableFutures.exceptionallyCompose(completableFuture, th -> {
            return handleErrors(ListenerExecutionFailedException.unwrapMessages(th), th);
        });
    }

    private CompletableFuture<Collection<Message<T>>> handleErrors(Collection<Message<T>> collection, Throwable th) {
        logger.debug("Handling error {} for message {}", th, MessageHeaderUtils.getId(collection));
        return CompletableFutures.exceptionallyCompose(this.errorHandler.handle(collection, th).thenApply(r3 -> {
            return collection;
        }), th2 -> {
            return CompletableFutures.failedFuture(maybeWrap(collection, th2));
        });
    }

    private Throwable maybeWrap(Collection<Message<T>> collection, Throwable th) {
        return ListenerExecutionFailedException.hasListenerException(th) ? th : new ListenerExecutionFailedException("Error handler returned an exception", th, collection);
    }
}
