package io.awspring.cloud.sqs.listener.sink;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/sink/OrderedMessageSink.class */
public class OrderedMessageSink<T> extends AbstractMessageProcessingPipelineSink<T> {
    private static final Logger logger = LoggerFactory.getLogger(OrderedMessageSink.class);

    @Override // io.awspring.cloud.sqs.listener.sink.AbstractMessageProcessingPipelineSink
    protected CompletableFuture<Void> doEmit(Collection<Message<T>> collection, MessageProcessingContext<T> messageProcessingContext) {
        logger.trace("Emitting messages {}", MessageHeaderUtils.getId(collection));
        return ((CompletableFuture) collection.stream().reduce(CompletableFuture.completedFuture(null), (completableFuture, message) -> {
            return CompletableFutures.handleCompose(completableFuture, (r7, th) -> {
                if (th == null) {
                    return execute(message, messageProcessingContext).whenComplete((BiConsumer<? super Void, ? super Throwable>) logIfError(message));
                }
                messageProcessingContext.runBackPressureReleaseCallback();
                return CompletableFutures.failedFuture(th);
            });
        }, (completableFuture2, completableFuture3) -> {
            return completableFuture2;
        })).exceptionally((Function) th -> {
            return null;
        });
    }

    private BiConsumer<Void, Throwable> logIfError(Message<T> message) {
        return (r6, th) -> {
            if (th != null) {
                logError(th, message);
            }
        };
    }
}
