/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.listener.ConsumerQueueDetail;
import com.github.sonus21.rqueue.listener.MappingInformation;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.ThreadCount;
import com.github.sonus21.rqueue.utils.QueueInfo;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public class RqueueMessageListenerContainer
implements InitializingBean,
DisposableBean,
SmartLifecycle,
BeanNameAware {
    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RqueueMessageListenerContainer.class);
    private static final int DEFAULT_WORKER_COUNT_PER_QUEUE = 2;
    private static final int POOL_SIZE_FOR_MESSAGE_MOVER = 5;
    private static final int POOL_SIZE_FOR_MESSAGE_PROCESSING_MOVER = 2;
    private static Logger logger = LoggerFactory.getLogger(RqueueMessageListenerContainer.class);
    private final Object lifecycleMgr = new Object();
    private Integer maxNumWorkers;
    private String beanName;
    private RqueueMessageHandler rqueueMessageHandler;
    private boolean defaultTaskExecutor = false;
    private AsyncTaskExecutor taskExecutor;
    private AsyncTaskExecutor spinningTaskExecutor;
    private boolean autoStartup = true;
    private Map<String, ConsumerQueueDetail> registeredQueues = new ConcurrentHashMap<String, ConsumerQueueDetail>();
    private Map<String, Boolean> queueRunningState = new ConcurrentHashMap<String, Boolean>();
    private ConcurrentHashMap<String, Future<?>> scheduledFutureByQueue = new ConcurrentHashMap();
    private boolean running = false;
    private long backOffTime = 5000L;
    private long maxWorkerWaitTime = 200000L;
    private long pollingInterval = 200L;
    private int phase = Integer.MAX_VALUE;
    private ApplicationContext applicationContext;
    private RqueueMessageTemplate rqueueMessageTemplate;
    @Autowired
    private RedisMessageListenerContainer rqueueRedisMessageListenerContainer;

    public RqueueMessageListenerContainer(RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) {
        Assert.notNull((Object)((Object)rqueueMessageHandler), (String)"rqueueMessageHandler can not be null");
        Assert.notNull((Object)rqueueMessageTemplate, (String)"rqueueMessageTemplate can not be null");
        this.rqueueMessageHandler = rqueueMessageHandler;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
    }

    public void setMaxWorkerWaitTime(long stopTime) {
        this.maxWorkerWaitTime = stopTime;
    }

    public long getMaxWorkerWaitTime() {
        return this.maxWorkerWaitTime;
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public RqueueMessageHandler getRqueueMessageHandler() {
        return this.rqueueMessageHandler;
    }

    public Integer getMaxNumWorkers() {
        return this.maxNumWorkers;
    }

    public void setMaxNumWorkers(int maxNumWorkers) {
        this.maxNumWorkers = maxNumWorkers;
    }

    public void setBackOffTime(long backOffTime) {
        this.backOffTime = backOffTime;
    }

    public long getBackoffTime() {
        return this.backOffTime;
    }

    public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() throws Exception {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.stop();
            this.doDestroy();
        }
    }

    protected void doDestroy() {
        if (this.defaultTaskExecutor && this.taskExecutor != null) {
            ((ThreadPoolTaskExecutor)this.taskExecutor).destroy();
        }
        if (this.spinningTaskExecutor != null) {
            ((ThreadPoolTaskExecutor)this.spinningTaskExecutor).destroy();
        }
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterPropertiesSet() throws Exception {
        int delayedQueueCount = 0;
        Object object = this.lifecycleMgr;
        synchronized (object) {
            for (MappingInformation mappingInformation : this.rqueueMessageHandler.getHandlerMethods().keySet()) {
                for (String queue : mappingInformation.getQueueNames()) {
                    ConsumerQueueDetail consumerQueueDetail = this.getConsumerQueueDetail(queue, mappingInformation);
                    if (consumerQueueDetail.isDelayedQueue()) {
                        ++delayedQueueCount;
                    }
                    this.registeredQueues.put(queue, consumerQueueDetail);
                }
            }
            this.registeredQueues = Collections.unmodifiableMap(this.registeredQueues);
            this.lifecycleMgr.notifyAll();
        }
        if (this.taskExecutor == null) {
            this.defaultTaskExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor();
        } else {
            this.spinningTaskExecutor = this.createSpinningTaskExecutor();
        }
        this.initializeRunningQueueState();
    }

    protected AsyncTaskExecutor getSpinningTaskExecutor() {
        return this.spinningTaskExecutor;
    }

    private AsyncTaskExecutor createSpinningTaskExecutor() {
        return this.createTaskExecutor(true);
    }

    public Map<String, ConsumerQueueDetail> getRegisteredQueues() {
        return this.registeredQueues;
    }

    private void initializeRunningQueueState() {
        for (String queue : this.getRegisteredQueues().keySet()) {
            this.queueRunningState.put(queue, false);
        }
    }

    private ThreadCount getThreadCount(boolean onlySpinning) {
        int corePoolSize;
        int queueSize = this.getRegisteredQueues().size();
        int n = corePoolSize = onlySpinning ? queueSize : queueSize + queueSize;
        int maxPoolSize = onlySpinning ? queueSize : queueSize + (this.getMaxNumWorkers() == null ? queueSize * 2 : this.getMaxNumWorkers());
        return new ThreadCount(corePoolSize, maxPoolSize);
    }

    private AsyncTaskExecutor createTaskExecutor(boolean onlySpinningThread) {
        String beanName = this.getBeanName();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
        ThreadCount threadCount = this.getThreadCount(onlySpinningThread);
        if (threadCount.getCorePoolSize() > 0) {
            threadPoolTaskExecutor.setCorePoolSize(threadCount.getCorePoolSize());
            threadPoolTaskExecutor.setMaxPoolSize(threadCount.getMaxPoolSize());
        }
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    public AsyncTaskExecutor createDefaultTaskExecutor() {
        return this.createTaskExecutor(false);
    }

    private ConsumerQueueDetail getConsumerQueueDetail(String queue, MappingInformation mappingInformation) {
        return new ConsumerQueueDetail(queue, mappingInformation.getNumRetries(), mappingInformation.getDeadLetterQueueName(), mappingInformation.isDelayedQueue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        logger.info("Starting Rqueue Message container");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.running = true;
            this.lifecycleMgr.notifyAll();
        }
        this.doStart();
    }

    protected void doStart() {
        for (Map.Entry<String, ConsumerQueueDetail> registeredQueue : this.getRegisteredQueues().entrySet()) {
            ConsumerQueueDetail queueDetail = registeredQueue.getValue();
            this.startQueue(registeredQueue.getKey(), queueDetail);
        }
    }

    protected void startQueue(String queueName, ConsumerQueueDetail queueDetail) {
        if (this.queueRunningState.containsKey(queueName) && this.queueRunningState.get(queueName).booleanValue()) {
            return;
        }
        this.queueRunningState.put(queueName, true);
        AsynchronousMessageListener messageListener = new AsynchronousMessageListener(queueName, queueDetail);
        Future future = this.spinningTaskExecutor == null ? this.getTaskExecutor().submit((Runnable)messageListener) : this.spinningTaskExecutor.submit((Runnable)messageListener);
        this.scheduledFutureByQueue.put(queueName, future);
    }

    private boolean isQueueActive(String queueName) {
        return this.queueRunningState.getOrDefault(queueName, false);
    }

    public AsyncTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        logger.info("Stopping Rqueue Message container");
        Object object = this.lifecycleMgr;
        synchronized (object) {
            this.running = false;
            this.lifecycleMgr.notifyAll();
        }
        this.doStop();
    }

    protected void doStop() {
        for (Map.Entry<String, Boolean> runningStateByQueue : this.queueRunningState.entrySet()) {
            if (!runningStateByQueue.getValue().booleanValue()) continue;
            this.stopQueue(runningStateByQueue.getKey());
        }
        this.waitForRunningQueuesToStop();
    }

    private void waitForRunningQueuesToStop() {
        for (Map.Entry<String, Boolean> queueRunningState : this.queueRunningState.entrySet()) {
            String queueName = queueRunningState.getKey();
            ConsumerQueueDetail queueDetail = this.getRegisteredQueues().get(queueName);
            Future<?> queueSpinningThread = this.scheduledFutureByQueue.get(queueName);
            if (queueSpinningThread == null || queueSpinningThread.isDone() || queueSpinningThread.isCancelled()) continue;
            try {
                queueSpinningThread.get(this.getMaxWorkerWaitTime(), TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException | TimeoutException e) {
                logger.warn("An exception occurred while stopping queue '{}'", (Object)queueName, (Object)e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void stopQueue(String queueName) {
        Assert.isTrue((boolean)this.queueRunningState.containsKey(queueName), (String)("Queue with name '" + queueName + "' does not exist"));
        this.queueRunningState.put(queueName, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.lifecycleMgr;
        synchronized (object) {
            return this.running;
        }
    }

    public long getPollingInterval() {
        return this.pollingInterval;
    }

    public void setPollingInterval(long pollingInterval) {
        this.pollingInterval = pollingInterval;
    }

    class MessageExecutor
    implements Runnable {
        private final ConsumerQueueDetail queueDetail;
        private final Message<String> message;
        private final RqueueMessage rqueueMessage;
        private static final int MAX_RETRY_COUNT = Integer.MAX_VALUE;
        private static final long DELTA_BETWEEN_RE_ENQUEUE_TIME = 5000L;

        MessageExecutor(RqueueMessage message, ConsumerQueueDetail queueDetail) {
            this.rqueueMessage = message;
            this.queueDetail = queueDetail;
            this.message = new GenericMessage((Object)message.getMessage(), QueueInfo.getQueueHeaders(queueDetail.getQueueName()));
        }

        private int getMaxRetryCount() {
            int maxRetryCount;
            int n = maxRetryCount = this.rqueueMessage.getRetryCount() == null ? this.queueDetail.getNumRetries() : this.rqueueMessage.getRetryCount().intValue();
            if (maxRetryCount == -1 && this.queueDetail.getDlqName().isEmpty()) {
                maxRetryCount = Integer.MAX_VALUE;
            }
            return maxRetryCount;
        }

        private long getMaxProcessingTime() {
            return QueueInfo.getMessageReEnqueueTime() - 5000L;
        }

        private void handlePostProcessing(boolean executed, int currentFailureCount, int maxRetryCount) {
            if (!RqueueMessageListenerContainer.this.isQueueActive(this.queueDetail.getQueueName())) {
                return;
            }
            try {
                String processingQueueName = QueueInfo.getProcessingQueueName(this.queueDetail.getQueueName());
                if (!executed) {
                    if (!this.queueDetail.getDlqName().isEmpty()) {
                        RqueueMessage newMessage = this.rqueueMessage.clone();
                        newMessage.setFailureCount(currentFailureCount);
                        newMessage.updateReEnqueuedAt();
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.add(this.queueDetail.getDlqName(), newMessage);
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.removeFromZset(processingQueueName, this.rqueueMessage);
                    } else if (currentFailureCount < maxRetryCount) {
                        RqueueMessage newMessage = this.rqueueMessage.clone();
                        newMessage.setFailureCount(currentFailureCount);
                        newMessage.updateReEnqueuedAt();
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.replaceMessage(processingQueueName, this.rqueueMessage, newMessage);
                    } else {
                        logger.warn("Message {} discarded due to retry limit", (Object)this.rqueueMessage);
                        RqueueMessageListenerContainer.this.rqueueMessageTemplate.removeFromZset(processingQueueName, this.rqueueMessage);
                    }
                } else {
                    RqueueMessageListenerContainer.this.rqueueMessageTemplate.removeFromZset(processingQueueName, this.rqueueMessage);
                }
            }
            catch (Exception e) {
                logger.error("Error occurred in post processing", (Throwable)e);
            }
        }

        @Override
        public void run() {
            boolean executed = false;
            int currentFailureCount = this.rqueueMessage.getFailureCount();
            int maxRetryCount = this.getMaxRetryCount();
            long maxRetryTime = this.getMaxProcessingTime();
            do {
                if (!RqueueMessageListenerContainer.this.isQueueActive(this.queueDetail.getQueueName())) {
                    return;
                }
                try {
                    RqueueMessageListenerContainer.this.getRqueueMessageHandler().handleMessage(this.message);
                    executed = true;
                }
                catch (Exception e) {
                    ++currentFailureCount;
                }
            } while (currentFailureCount < maxRetryCount && !executed && System.currentTimeMillis() < maxRetryTime);
            this.handlePostProcessing(executed, currentFailureCount, maxRetryCount);
        }
    }

    private class AsynchronousMessageListener
    implements Runnable {
        private final String queueName;
        private final ConsumerQueueDetail queueDetail;

        private RqueueMessage getMessage() {
            return RqueueMessageListenerContainer.this.rqueueMessageTemplate.pop(this.queueName);
        }

        AsynchronousMessageListener(String queueName, ConsumerQueueDetail value) {
            this.queueName = queueName;
            this.queueDetail = value;
        }

        @Override
        public void run() {
            while (RqueueMessageListenerContainer.this.isQueueActive(this.queueName)) {
                try {
                    RqueueMessage message = this.getMessage();
                    if (message != null) {
                        RqueueMessageListenerContainer.this.getTaskExecutor().execute((Runnable)new MessageExecutor(message, this.queueDetail));
                        continue;
                    }
                    try {
                        Thread.sleep(RqueueMessageListenerContainer.this.getPollingInterval());
                    }
                    catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                catch (Exception e) {
                    logger.warn("Message listener failed for queue {}, it will be retried in {} Ms", new Object[]{this.queueName, RqueueMessageListenerContainer.this.getBackoffTime(), e});
                    try {
                        Thread.sleep(RqueueMessageListenerContainer.this.getBackoffTime());
                    }
                    catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }
}

