package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.utils.QueueInfo;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.class */
public class RqueueMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
    private static final int DEFAULT_WORKER_COUNT_PER_QUEUE = 2;
    private static final int POOL_SIZE_FOR_MESSAGE_MOVER = 5;
    private static final int POOL_SIZE_FOR_MESSAGE_PROCESSING_MOVER = 2;
    private Integer maxNumWorkers;
    private String beanName;
    private RqueueMessageHandler rqueueMessageHandler;
    private AsyncTaskExecutor taskExecutor;
    private AsyncTaskExecutor spinningTaskExecutor;
    private ApplicationContext applicationContext;
    private RqueueMessageTemplate rqueueMessageTemplate;

    @Autowired
    private RedisMessageListenerContainer rqueueRedisMessageListenerContainer;
    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RqueueMessageListenerContainer.class);
    private static Logger logger = LoggerFactory.getLogger(RqueueMessageListenerContainer.class);
    private final Object lifecycleMgr = new Object();
    private boolean defaultTaskExecutor = false;
    private boolean autoStartup = true;
    private Map<String, ConsumerQueueDetail> registeredQueues = new ConcurrentHashMap();
    private Map<String, Boolean> queueRunningState = new ConcurrentHashMap();
    private ConcurrentHashMap<String, Future<?>> scheduledFutureByQueue = new ConcurrentHashMap<>();
    private boolean running = false;
    private long backOffTime = 5000;
    private long maxWorkerWaitTime = 200000;
    private long pollingInterval = 200;
    private int phase = Integer.MAX_VALUE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer$AsynchronousMessageListener.class */
    public class AsynchronousMessageListener implements Runnable {
        private final String queueName;
        private final ConsumerQueueDetail queueDetail;

        private RqueueMessage getMessage() {
            return RqueueMessageListenerContainer.this.rqueueMessageTemplate.pop(this.queueName);
        }

        AsynchronousMessageListener(String str, ConsumerQueueDetail consumerQueueDetail) {
            this.queueName = str;
            this.queueDetail = consumerQueueDetail;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (RqueueMessageListenerContainer.this.isQueueActive(this.queueName)) {
                try {
                    RqueueMessage message = getMessage();
                    if (message != null) {
                        RqueueMessageListenerContainer.this.getTaskExecutor().execute(new MessageExecutor(message, this.queueDetail));
                    } else {
                        try {
                            Thread.sleep(RqueueMessageListenerContainer.this.getPollingInterval());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                } catch (Exception e2) {
                    RqueueMessageListenerContainer.logger.warn("Message listener failed for queue {}, it will be retried in {} Ms", new Object[]{this.queueName, Long.valueOf(RqueueMessageListenerContainer.this.getBackoffTime()), e2});
                    try {
                        Thread.sleep(RqueueMessageListenerContainer.this.getBackoffTime());
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                }
            }
        }
    }

    /* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer$MessageExecutor.class */
    class MessageExecutor implements Runnable {
        private final ConsumerQueueDetail queueDetail;
        private final Message<String> message;
        private final RqueueMessage rqueueMessage;
        private static final int MAX_RETRY_COUNT = Integer.MAX_VALUE;
        private static final long DELTA_BETWEEN_RE_ENQUEUE_TIME = 5000;

        MessageExecutor(RqueueMessage rqueueMessage, ConsumerQueueDetail consumerQueueDetail) {
            this.rqueueMessage = rqueueMessage;
            this.queueDetail = consumerQueueDetail;
            this.message = new GenericMessage(rqueueMessage.getMessage(), QueueInfo.getQueueHeaders(consumerQueueDetail.getQueueName()));
        }

        private int getMaxRetryCount() {
            int numRetries = this.rqueueMessage.getRetryCount() == null ? this.queueDetail.getNumRetries() : this.rqueueMessage.getRetryCount().intValue();
            if (numRetries == -1 && this.queueDetail.getDlqName().isEmpty()) {
                numRetries = MAX_RETRY_COUNT;
            }
            return numRetries;
        }

        private long getMaxProcessingTime() {
            return QueueInfo.getMessageReEnqueueTime() - DELTA_BETWEEN_RE_ENQUEUE_TIME;
        }

        private void handlePostProcessing(boolean z, int i, int i2) {
            if (RqueueMessageListenerContainer.this.isQueueActive(this.queueDetail.getQueueName())) {
                try {
                    String processingQueueName = QueueInfo.getProcessingQueueName(this.queueDetail.getQueueName());
                    if (z) {
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.removeFromZset(processingQueueName, this.rqueueMessage);
                    } else if (!this.queueDetail.getDlqName().isEmpty()) {
                        RqueueMessage m1clone = this.rqueueMessage.m1clone();
                        m1clone.setFailureCount(i);
                        m1clone.updateReEnqueuedAt();
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.add(this.queueDetail.getDlqName(), m1clone);
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.removeFromZset(processingQueueName, this.rqueueMessage);
                    } else if (i < i2) {
                        RqueueMessage m1clone2 = this.rqueueMessage.m1clone();
                        m1clone2.setFailureCount(i);
                        m1clone2.updateReEnqueuedAt();
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.replaceMessage(processingQueueName, this.rqueueMessage, m1clone2);
                    } else {
                        RqueueMessageListenerContainer.logger.warn("Message {} discarded due to retry limit", this.rqueueMessage);
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.removeFromZset(processingQueueName, this.rqueueMessage);
                    }
                } catch (Exception e) {
                    RqueueMessageListenerContainer.logger.error("Error occurred in post processing", e);
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            int failureCount = this.rqueueMessage.getFailureCount();
            int maxRetryCount = getMaxRetryCount();
            long maxProcessingTime = getMaxProcessingTime();
            while (RqueueMessageListenerContainer.this.isQueueActive(this.queueDetail.getQueueName())) {
                try {
                    RqueueMessageListenerContainer.this.getRqueueMessageHandler().handleMessage(this.message);
                    z = true;
                } catch (Exception e) {
                    failureCount++;
                }
                if (failureCount >= maxRetryCount || z || System.currentTimeMillis() >= maxProcessingTime) {
                    handlePostProcessing(z, failureCount, maxRetryCount);
                    return;
                }
            }
        }
    }

    public RqueueMessageListenerContainer(RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) {
        Assert.notNull(rqueueMessageHandler, "rqueueMessageHandler can not be null");
        Assert.notNull(rqueueMessageTemplate, "rqueueMessageTemplate can not be null");
        this.rqueueMessageHandler = rqueueMessageHandler;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
    }

    public void setMaxWorkerWaitTime(long j) {
        this.maxWorkerWaitTime = j;
    }

    public long getMaxWorkerWaitTime() {
        return this.maxWorkerWaitTime;
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public RqueueMessageHandler getRqueueMessageHandler() {
        return this.rqueueMessageHandler;
    }

    public Integer getMaxNumWorkers() {
        return this.maxNumWorkers;
    }

    public void setMaxNumWorkers(int i) {
        this.maxNumWorkers = Integer.valueOf(i);
    }

    public void setBackOffTime(long j) {
        this.backOffTime = j;
    }

    public long getBackoffTime() {
        return this.backOffTime;
    }

    public void setTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
        this.taskExecutor = asyncTaskExecutor;
    }

    public void destroy() throws Exception {
        synchronized (this.lifecycleMgr) {
            stop();
            doDestroy();
        }
    }

    protected void doDestroy() {
        if (this.defaultTaskExecutor && this.taskExecutor != null) {
            this.taskExecutor.destroy();
        }
        if (this.spinningTaskExecutor != null) {
            this.spinningTaskExecutor.destroy();
        }
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public void afterPropertiesSet() throws Exception {
        int i = 0;
        synchronized (this.lifecycleMgr) {
            for (MappingInformation mappingInformation : this.rqueueMessageHandler.getHandlerMethods().keySet()) {
                for (String str : mappingInformation.getQueueNames()) {
                    ConsumerQueueDetail consumerQueueDetail = getConsumerQueueDetail(str, mappingInformation);
                    if (consumerQueueDetail.isDelayedQueue()) {
                        i++;
                    }
                    this.registeredQueues.put(str, consumerQueueDetail);
                }
            }
            this.registeredQueues = Collections.unmodifiableMap(this.registeredQueues);
            this.lifecycleMgr.notifyAll();
        }
        if (this.taskExecutor == null) {
            this.defaultTaskExecutor = true;
            this.taskExecutor = createDefaultTaskExecutor();
        } else {
            this.spinningTaskExecutor = createSpinningTaskExecutor();
        }
        initializeRunningQueueState();
    }

    protected AsyncTaskExecutor getSpinningTaskExecutor() {
        return this.spinningTaskExecutor;
    }

    private AsyncTaskExecutor createSpinningTaskExecutor() {
        return createTaskExecutor(true);
    }

    public Map<String, ConsumerQueueDetail> getRegisteredQueues() {
        return this.registeredQueues;
    }

    private void initializeRunningQueueState() {
        Iterator<String> it = getRegisteredQueues().keySet().iterator();
        while (it.hasNext()) {
            this.queueRunningState.put(it.next(), false);
        }
    }

    private ThreadCount getThreadCount(boolean z) {
        int intValue;
        int size = getRegisteredQueues().size();
        int i = z ? size : size + size;
        if (z) {
            intValue = size;
        } else {
            intValue = size + (getMaxNumWorkers() == null ? size * 2 : getMaxNumWorkers().intValue());
        }
        return new ThreadCount(i, intValue);
    }

    private AsyncTaskExecutor createTaskExecutor(boolean z) {
        String beanName = getBeanName();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
        ThreadCount threadCount = getThreadCount(z);
        if (threadCount.getCorePoolSize() > 0) {
            threadPoolTaskExecutor.setCorePoolSize(threadCount.getCorePoolSize());
            threadPoolTaskExecutor.setMaxPoolSize(threadCount.getMaxPoolSize());
        }
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    public AsyncTaskExecutor createDefaultTaskExecutor() {
        return createTaskExecutor(false);
    }

    private ConsumerQueueDetail getConsumerQueueDetail(String str, MappingInformation mappingInformation) {
        return new ConsumerQueueDetail(str, mappingInformation.getNumRetries(), mappingInformation.getDeadLetterQueueName(), mappingInformation.isDelayedQueue());
    }

    public void start() {
        logger.info("Starting Rqueue Message container");
        synchronized (this.lifecycleMgr) {
            this.running = true;
            this.lifecycleMgr.notifyAll();
        }
        doStart();
    }

    protected void doStart() {
        for (Map.Entry<String, ConsumerQueueDetail> entry : getRegisteredQueues().entrySet()) {
            startQueue(entry.getKey(), entry.getValue());
        }
    }

    protected void startQueue(String str, ConsumerQueueDetail consumerQueueDetail) {
        if (this.queueRunningState.containsKey(str) && this.queueRunningState.get(str).booleanValue()) {
            return;
        }
        this.queueRunningState.put(str, true);
        AsynchronousMessageListener asynchronousMessageListener = new AsynchronousMessageListener(str, consumerQueueDetail);
        this.scheduledFutureByQueue.put(str, this.spinningTaskExecutor == null ? getTaskExecutor().submit(asynchronousMessageListener) : this.spinningTaskExecutor.submit(asynchronousMessageListener));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isQueueActive(String str) {
        return this.queueRunningState.getOrDefault(str, false).booleanValue();
    }

    public AsyncTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public void stop() {
        logger.info("Stopping Rqueue Message container");
        synchronized (this.lifecycleMgr) {
            this.running = false;
            this.lifecycleMgr.notifyAll();
        }
        doStop();
    }

    protected void doStop() {
        for (Map.Entry<String, Boolean> entry : this.queueRunningState.entrySet()) {
            if (entry.getValue().booleanValue()) {
                stopQueue(entry.getKey());
            }
        }
        waitForRunningQueuesToStop();
    }

    private void waitForRunningQueuesToStop() {
        Iterator<Map.Entry<String, Boolean>> it = this.queueRunningState.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            getRegisteredQueues().get(key);
            Future<?> future = this.scheduledFutureByQueue.get(key);
            if (future != null && !future.isDone() && !future.isCancelled()) {
                try {
                    future.get(getMaxWorkerWaitTime(), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ExecutionException | TimeoutException e2) {
                    logger.warn("An exception occurred while stopping queue '{}'", key, e2);
                }
            }
        }
    }

    private void stopQueue(String str) {
        Assert.isTrue(this.queueRunningState.containsKey(str), "Queue with name '" + str + "' does not exist");
        this.queueRunningState.put(str, false);
    }

    public boolean isRunning() {
        boolean z;
        synchronized (this.lifecycleMgr) {
            z = this.running;
        }
        return z;
    }

    public long getPollingInterval() {
        return this.pollingInterval;
    }

    public void setPollingInterval(long j) {
        this.pollingInterval = j;
    }
}
