package io.awspring.cloud.sqs.listener.acknowledgement;

import io.awspring.cloud.sqs.LifecycleHandler;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/acknowledgement/BatchingAcknowledgementProcessor.class */
public class BatchingAcknowledgementProcessor<T> extends AbstractOrderingAcknowledgementProcessor<T> implements TaskExecutorAware {
    private static final Logger logger = LoggerFactory.getLogger(BatchingAcknowledgementProcessor.class);
    private BufferingAcknowledgementProcessor<T> acknowledgementProcessor;
    private BlockingQueue<Message<T>> acks;
    private Integer ackThreshold;
    private Duration ackInterval;
    private TaskExecutor taskExecutor;
    private TaskScheduler taskScheduler;
    private Duration acknowledgementShutdownTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/awspring/cloud/sqs/listener/acknowledgement/BatchingAcknowledgementProcessor$AcknowledgementExecutionContext.class */
    public static class AcknowledgementExecutionContext<T> {
        private final String id;
        private final Lock ackLock;
        private final Supplier<Boolean> runningFunction;
        private final Map<String, BlockingQueue<Message<T>>> acksBuffer;
        private final Function<Collection<Message<T>>, CompletableFuture<Void>> executingFunction;
        private final Collection<CompletableFuture<Void>> runningAcks = Collections.synchronizedSet(new HashSet());
        private Instant lastAcknowledgement = Instant.now();

        public AcknowledgementExecutionContext(String str, Map<String, BlockingQueue<Message<T>>> map, Lock lock, Supplier<Boolean> supplier, Function<Collection<Message<T>>, CompletableFuture<Void>> function) {
            this.id = str;
            this.acksBuffer = map;
            this.ackLock = lock;
            this.runningFunction = supplier;
            this.executingFunction = function;
        }

        private List<CompletableFuture<Void>> executeAcksUpTo(int i, int i2) {
            verifyLock();
            List<CompletableFuture<Void>> list = (List) this.acksBuffer.entrySet().stream().filter(entry -> {
                return ((BlockingQueue) entry.getValue()).size() >= i;
            }).map(entry2 -> {
                return doExecute((String) entry2.getKey(), (BlockingQueue) entry2.getValue(), i2 == Integer.MAX_VALUE ? ((BlockingQueue) entry2.getValue()).size() : i2);
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                return Collections.emptyList();
            }
            purgeEmptyBuffers();
            return list;
        }

        private List<CompletableFuture<Void>> executeAllAcks() {
            verifyLock();
            List<CompletableFuture<Void>> list = (List) this.acksBuffer.entrySet().stream().filter(entry -> {
                return ((BlockingQueue) entry.getValue()).size() > 0;
            }).map(entry2 -> {
                return doExecute((String) entry2.getKey(), (BlockingQueue) entry2.getValue(), ((BlockingQueue) entry2.getValue()).size());
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                purgeEmptyBuffers();
            }
            return list;
        }

        private void verifyLock() {
            if (this.ackLock instanceof ReentrantLock) {
                Assert.isTrue(((ReentrantLock) this.ackLock).isHeldByCurrentThread(), "no lock for executing acknowledgements");
            }
        }

        private CompletableFuture<Void> doExecute(String str, BlockingQueue<Message<T>> blockingQueue, int i) {
            BatchingAcknowledgementProcessor.logger.trace("Executing acknowledgement for up to {} messages {} of group {} in {}.", new Object[]{Integer.valueOf(i), MessageHeaderUtils.getId(blockingQueue), str, this.id});
            CompletableFuture<Void> manageFuture = manageFuture(execute(pollUpToThreshold(str, blockingQueue, i)));
            this.lastAcknowledgement = Instant.now();
            return manageFuture;
        }

        private List<Message<T>> pollUpToThreshold(String str, BlockingQueue<Message<T>> blockingQueue, int i) {
            return (List) IntStream.range(0, i).mapToObj(i2 -> {
                return pollMessage(str, blockingQueue);
            }).collect(Collectors.toList());
        }

        private Message<T> pollMessage(String str, BlockingQueue<Message<T>> blockingQueue) {
            Message<T> poll = blockingQueue.poll();
            Assert.notNull(poll, "poll should never return null");
            BatchingAcknowledgementProcessor.logger.trace("Retrieved message {} from the queue for group {}. Queue size: {}", new Object[]{MessageHeaderUtils.getId((Message<?>) poll), str, Integer.valueOf(blockingQueue.size())});
            return poll;
        }

        private CompletableFuture<Void> execute(Collection<Message<T>> collection) {
            Assert.notEmpty(collection, "empty collection sent for acknowledgement");
            BatchingAcknowledgementProcessor.logger.trace("Executing {} acknowledgements for {}", Integer.valueOf(collection.size()), this.id);
            return this.executingFunction.apply(collection);
        }

        private CompletableFuture<Void> manageFuture(CompletableFuture<Void> completableFuture) {
            this.runningAcks.add(completableFuture);
            completableFuture.whenComplete((r5, th) -> {
                if (isRunning()) {
                    this.runningAcks.remove(completableFuture);
                }
            });
            return completableFuture;
        }

        private boolean isRunning() {
            return this.runningFunction.get().booleanValue();
        }

        private void purgeEmptyBuffers() {
            lock();
            try {
                List list = (List) this.acksBuffer.entrySet().stream().filter(entry -> {
                    return ((BlockingQueue) entry.getValue()).isEmpty();
                }).map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toList());
                BatchingAcknowledgementProcessor.logger.trace("Removing groups {} from buffer in {}", list, this.id);
                Map<String, BlockingQueue<Message<T>>> map = this.acksBuffer;
                Objects.requireNonNull(map);
                list.forEach((v1) -> {
                    r1.remove(v1);
                });
            } finally {
                unlock();
            }
        }

        private void lock() {
            this.ackLock.lock();
        }

        private void unlock() {
            this.ackLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/awspring/cloud/sqs/listener/acknowledgement/BatchingAcknowledgementProcessor$BufferingAcknowledgementProcessor.class */
    public static class BufferingAcknowledgementProcessor<T> implements Runnable {
        private final BlockingQueue<Message<T>> acks;
        private final Integer ackThreshold;
        private final BatchingAcknowledgementProcessor<T> parent;
        private final Map<String, BlockingQueue<Message<T>>> acksBuffer = new ConcurrentHashMap();
        private final Duration ackShutdownTimeout;
        private final AcknowledgementExecutionContext<T> context;
        private final ScheduledAcknowledgementExecution<T> scheduledExecution;
        private final ThresholdAcknowledgementExecutor<T> thresholdAcknowledgementExecution;
        private final Function<Message<T>, String> messageGroupingFunction;

        private BufferingAcknowledgementProcessor(BatchingAcknowledgementProcessor<T> batchingAcknowledgementProcessor) {
            this.acks = ((BatchingAcknowledgementProcessor) batchingAcknowledgementProcessor).acks;
            this.ackThreshold = ((BatchingAcknowledgementProcessor) batchingAcknowledgementProcessor).ackThreshold;
            this.ackShutdownTimeout = ((BatchingAcknowledgementProcessor) batchingAcknowledgementProcessor).acknowledgementShutdownTimeout;
            this.parent = batchingAcknowledgementProcessor;
            this.messageGroupingFunction = batchingAcknowledgementProcessor.getMessageGroupingFunction();
            String id = batchingAcknowledgementProcessor.getId();
            Map<String, BlockingQueue<Message<T>>> map = this.acksBuffer;
            ReentrantLock reentrantLock = new ReentrantLock();
            Objects.requireNonNull(batchingAcknowledgementProcessor);
            Supplier supplier = batchingAcknowledgementProcessor::isRunning;
            Objects.requireNonNull(batchingAcknowledgementProcessor);
            this.context = new AcknowledgementExecutionContext<>(id, map, reentrantLock, supplier, batchingAcknowledgementProcessor::sendToExecutor);
            this.scheduledExecution = new ScheduledAcknowledgementExecution<>(((BatchingAcknowledgementProcessor) batchingAcknowledgementProcessor).ackInterval, ((BatchingAcknowledgementProcessor) batchingAcknowledgementProcessor).taskScheduler, this.context);
            this.thresholdAcknowledgementExecution = new ThresholdAcknowledgementExecutor<>(((BatchingAcknowledgementProcessor) batchingAcknowledgementProcessor).ackThreshold.intValue(), this.context);
        }

        @Override // java.lang.Runnable
        public void run() {
            BatchingAcknowledgementProcessor.logger.debug("Starting acknowledgement processor thread with batchSize: {}", this.ackThreshold);
            this.scheduledExecution.start();
            while (this.parent.isRunning()) {
                try {
                    Message<T> poll = this.acks.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        this.acksBuffer.computeIfAbsent(this.messageGroupingFunction.apply(poll), str -> {
                            return new LinkedBlockingQueue();
                        }).add(poll);
                        this.thresholdAcknowledgementExecution.checkAndExecute();
                    }
                } catch (Exception e) {
                    BatchingAcknowledgementProcessor.logger.error("Error while handling acknowledgements for {}, resuming.", this.parent.getId(), e);
                }
            }
            BatchingAcknowledgementProcessor.logger.debug("Acknowledgement processor thread stopped");
        }

        public void waitAcknowledgementsToFinish() {
            try {
                CompletableFuture.allOf((CompletableFuture[]) ((AcknowledgementExecutionContext) this.context).runningAcks.toArray(new CompletableFuture[0])).get(this.ackShutdownTimeout.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Interrupted while waiting for acknowledgements to finish");
            } catch (TimeoutException e2) {
                BatchingAcknowledgementProcessor.logger.warn("Acknowledgements did not finish in {} ms. Proceeding with shutdown.", Long.valueOf(this.ackShutdownTimeout.toMillis()));
            } catch (Exception e3) {
                BatchingAcknowledgementProcessor.logger.warn("Error thrown when waiting for acknowledgement tasks to finish in {}. Continuing with shutdown.", this.parent.getId(), e3);
            }
            if (((AcknowledgementExecutionContext) this.context).runningAcks.isEmpty()) {
                return;
            }
            ((AcknowledgementExecutionContext) this.context).runningAcks.forEach(completableFuture -> {
                completableFuture.cancel(true);
            });
        }
    }

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/acknowledgement/BatchingAcknowledgementProcessor$ScheduledAcknowledgementExecution.class */
    private static class ScheduledAcknowledgementExecution<T> {
        private final AcknowledgementExecutionContext<T> context;
        private final TaskScheduler taskScheduler;
        private final Duration ackInterval;

        public ScheduledAcknowledgementExecution(Duration duration, TaskScheduler taskScheduler, AcknowledgementExecutionContext<T> acknowledgementExecutionContext) {
            this.ackInterval = duration;
            this.taskScheduler = taskScheduler;
            this.context = acknowledgementExecutionContext;
        }

        private void start() {
            if (this.ackInterval != Duration.ZERO) {
                BatchingAcknowledgementProcessor.logger.debug("Starting scheduled thread with interval of {}ms for {}", Long.valueOf(this.ackInterval.toMillis()), ((AcknowledgementExecutionContext) this.context).id);
                scheduleNextExecution(Instant.now().plus((TemporalAmount) this.ackInterval));
            }
        }

        private void scheduleNextExecution(Instant instant) {
            if (!this.context.isRunning()) {
                BatchingAcknowledgementProcessor.logger.debug("AcknowledgementProcessor {} stopped, not scheduling next acknowledgement execution.", ((AcknowledgementExecutionContext) this.context).id);
                return;
            }
            try {
                BatchingAcknowledgementProcessor.logger.trace("Scheduling next acknowledgement execution in {}ms", Long.valueOf(instant.toEpochMilli() - Instant.now().toEpochMilli()));
                this.taskScheduler.schedule(this::executeScheduledAcknowledgement, instant);
            } catch (Exception e) {
                if (this.context.isRunning()) {
                    BatchingAcknowledgementProcessor.logger.warn("Error thrown when scheduling next execution in {}. Resuming.", ((AcknowledgementExecutionContext) this.context).id, e);
                }
                scheduleNextExecution(((AcknowledgementExecutionContext) this.context).lastAcknowledgement.plus((TemporalAmount) this.ackInterval));
            }
        }

        private void executeScheduledAcknowledgement() {
            this.context.lock();
            try {
                pollAndExecuteScheduled();
                scheduleNextExecution(((AcknowledgementExecutionContext) this.context).lastAcknowledgement.plus((TemporalAmount) this.ackInterval));
            } catch (Exception e) {
                BatchingAcknowledgementProcessor.logger.error("Error executing scheduled acknowledgement in {}. Resuming.", ((AcknowledgementExecutionContext) this.context).id, e);
                scheduleNextExecution(((AcknowledgementExecutionContext) this.context).lastAcknowledgement.plus((TemporalAmount) this.ackInterval));
            } finally {
                this.context.unlock();
            }
        }

        private void pollAndExecuteScheduled() {
            if (Instant.now().isAfter(((AcknowledgementExecutionContext) this.context).lastAcknowledgement.plus((TemporalAmount) this.ackInterval)) && this.context.executeAllAcks().isEmpty()) {
                ((AcknowledgementExecutionContext) this.context).lastAcknowledgement = Instant.now();
            }
        }
    }

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/acknowledgement/BatchingAcknowledgementProcessor$ThresholdAcknowledgementExecutor.class */
    private static class ThresholdAcknowledgementExecutor<T> {
        private final AcknowledgementExecutionContext<T> context;
        private final int ackThreshold;

        public ThresholdAcknowledgementExecutor(int i, AcknowledgementExecutionContext<T> acknowledgementExecutionContext) {
            this.context = acknowledgementExecutionContext;
            this.ackThreshold = i;
        }

        private void checkAndExecute() {
            if (this.ackThreshold == 0) {
                return;
            }
            do {
            } while (!executeThresholdAcks().isEmpty());
        }

        private List<CompletableFuture<Void>> executeThresholdAcks() {
            this.context.lock();
            try {
                BatchingAcknowledgementProcessor.logger.trace("Executing acknowledgement for threshold in {}.", ((AcknowledgementExecutionContext) this.context).id);
                return this.context.executeAcksUpTo(this.ackThreshold, this.ackThreshold);
            } finally {
                this.context.unlock();
            }
        }
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AbstractOrderingAcknowledgementProcessor
    protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
        this.ackInterval = containerOptions.getAcknowledgementInterval();
        this.ackThreshold = containerOptions.getAcknowledgementThreshold();
        this.acknowledgementShutdownTimeout = containerOptions.getAcknowledgementShutdownTimeout();
    }

    @Override // io.awspring.cloud.sqs.listener.TaskExecutorAware
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull(taskExecutor, "taskExecutor cannot be null");
        this.taskExecutor = taskExecutor;
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AbstractOrderingAcknowledgementProcessor
    protected CompletableFuture<Void> doOnAcknowledge(Message<T> message) {
        if (!this.acks.offer(message)) {
            logger.warn("Acknowledgement queue full, dropping acknowledgement for message {}", MessageHeaderUtils.getId((Message<?>) message));
        }
        logger.trace("Received message {} to ack in {}.", MessageHeaderUtils.getId((Message<?>) message), getId());
        return CompletableFuture.completedFuture(null);
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AbstractOrderingAcknowledgementProcessor
    protected CompletableFuture<Void> doOnAcknowledge(Collection<Message<T>> collection) {
        collection.forEach(this::onAcknowledge);
        return CompletableFuture.completedFuture(null);
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AbstractOrderingAcknowledgementProcessor
    public void doStart() {
        Assert.notNull(this.ackInterval, "ackInterval not set");
        Assert.notNull(this.ackThreshold, "ackThreshold not set");
        Assert.notNull(this.taskExecutor, "executor not set");
        Assert.notNull(this.acknowledgementShutdownTimeout, "timeout not set");
        Assert.state(this.ackInterval != Duration.ZERO || this.ackThreshold.intValue() > 0, () -> {
            return getClass().getSimpleName() + " cannot be used with Duration.ZERO and acknowledgement threshold 0.Consider using a " + ImmediateAcknowledgementProcessor.class + "instead";
        });
        this.acks = new LinkedBlockingQueue();
        this.taskScheduler = createTaskScheduler();
        this.acknowledgementProcessor = createAcknowledgementProcessor();
        this.taskExecutor.execute(this.acknowledgementProcessor);
    }

    protected TaskScheduler createTaskScheduler() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setThreadNamePrefix(getId() + "-");
        threadPoolTaskScheduler.initialize();
        return threadPoolTaskScheduler;
    }

    protected BufferingAcknowledgementProcessor<T> createAcknowledgementProcessor() {
        return new BufferingAcknowledgementProcessor<>(this);
    }

    @Override // io.awspring.cloud.sqs.listener.acknowledgement.AbstractOrderingAcknowledgementProcessor
    public void doStop() {
        this.acknowledgementProcessor.waitAcknowledgementsToFinish();
        LifecycleHandler.get().dispose(this.taskScheduler);
    }
}
