package io.awspring.cloud.sqs.listener.sink;

import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/sink/AbstractMessageProcessingPipelineSink.class */
public abstract class AbstractMessageProcessingPipelineSink<T> implements MessageProcessingPipelineSink<T>, TaskExecutorAware {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessageProcessingPipelineSink.class);
    private final Object lifecycleMonitor = new Object();
    private volatile boolean running;
    private Executor taskExecutor;
    private MessageProcessingPipeline<T> messageProcessingPipeline;
    private String id;

    @Override // io.awspring.cloud.sqs.listener.sink.MessageProcessingPipelineSink
    public void setMessagePipeline(MessageProcessingPipeline<T> messageProcessingPipeline) {
        Assert.notNull(messageProcessingPipeline, "messageProcessingPipeline must not be null.");
        this.messageProcessingPipeline = messageProcessingPipeline;
    }

    @Override // io.awspring.cloud.sqs.listener.TaskExecutorAware
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull(taskExecutor, "executor cannot be null");
        this.taskExecutor = taskExecutor;
    }

    @Override // io.awspring.cloud.sqs.listener.sink.MessageSink
    public CompletableFuture<Void> emit(Collection<Message<T>> collection, MessageProcessingContext<T> messageProcessingContext) {
        Assert.notNull(collection, "messages cannot be null");
        if (!isRunning()) {
            logger.debug("{} {} not running, returning", getClass().getSimpleName(), this.id);
            return CompletableFuture.completedFuture(null);
        }
        if (collection.size() != 0) {
            return doEmit(collection, messageProcessingContext);
        }
        logger.debug("No messages provided for {} {}, returning.", getClass().getSimpleName(), this.id);
        return CompletableFuture.completedFuture(null);
    }

    protected abstract CompletableFuture<Void> doEmit(Collection<Message<T>> collection, MessageProcessingContext<T> messageProcessingContext);

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> execute(Message<T> message, MessageProcessingContext<T> messageProcessingContext) {
        logger.trace("Executing message {}", MessageHeaderUtils.getId((Message<?>) message));
        StopWatch startedWatch = getStartedWatch();
        return doExecute(() -> {
            return this.messageProcessingPipeline.process(message, messageProcessingContext);
        }).whenComplete((r3, th) -> {
            messageProcessingContext.runBackPressureReleaseCallback();
        }).whenComplete((r7, th2) -> {
            measureExecution(startedWatch, Collections.singletonList(message));
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> execute(Collection<Message<T>> collection, MessageProcessingContext<T> messageProcessingContext) {
        StopWatch startedWatch = getStartedWatch();
        return doExecute(() -> {
            return this.messageProcessingPipeline.process(collection, messageProcessingContext);
        }).whenComplete((r5, th) -> {
            collection.forEach(message -> {
                messageProcessingContext.runBackPressureReleaseCallback();
            });
        }).whenComplete((r7, th2) -> {
            measureExecution(startedWatch, collection);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Void logError(Throwable th, Message<T> message) {
        logger.error("Error processing message {}.", MessageHeaderUtils.getId((Message<?>) message), th);
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Void logError(Throwable th, Collection<Message<T>> collection) {
        logger.error("Error processing message {}.", MessageHeaderUtils.getId(collection), th);
        return null;
    }

    private StopWatch getStartedWatch() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        return stopWatch;
    }

    private void measureExecution(StopWatch stopWatch, Collection<Message<T>> collection) {
        stopWatch.stop();
        if (logger.isTraceEnabled()) {
            logger.trace("Messages {} processed in {}ms", MessageHeaderUtils.getId(collection), Long.valueOf(stopWatch.getTotalTimeMillis()));
        }
    }

    private CompletableFuture<Void> doExecute(Supplier<CompletableFuture<?>> supplier) {
        return CompletableFuture.supplyAsync(supplier, this.taskExecutor).thenCompose(completableFuture -> {
            return completableFuture;
        }).thenRun(() -> {
        });
    }

    public void start() {
        if (isRunning()) {
            logger.debug("{} {} already running", getClass().getSimpleName(), this.id);
            return;
        }
        synchronized (this.lifecycleMonitor) {
            Assert.notNull(this.messageProcessingPipeline, "messageListener not set");
            Assert.notNull(this.taskExecutor, "taskExecutor not set");
            this.id = getOrCreateId();
            logger.debug("Starting {} {}", getClass().getSimpleName(), this.id);
            this.running = true;
        }
    }

    private String getOrCreateId() {
        return this.taskExecutor instanceof ThreadPoolTaskExecutor ? this.taskExecutor.getThreadNamePrefix() : UUID.randomUUID().toString();
    }

    public void stop() {
        if (!isRunning()) {
            logger.debug("{} {} already stopped", getClass().getSimpleName(), this.id);
            return;
        }
        synchronized (this.lifecycleMonitor) {
            logger.debug("Stopping {} {}", getClass().getSimpleName(), this.id);
            this.running = false;
        }
    }

    public boolean isRunning() {
        return this.running;
    }
}
