package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.metrics.RqueueCounter;
import com.github.sonus21.rqueue.processor.MessageProcessor;
import com.github.sonus21.rqueue.utils.MessageUtils;
import com.github.sonus21.rqueue.utils.QueueUtils;
import java.lang.ref.WeakReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:com/github/sonus21/rqueue/listener/MessageExecutor.class */
class MessageExecutor extends MessageContainerBase implements Runnable {
    private static final int MAX_RETRY_COUNT = Integer.MAX_VALUE;
    private final QueueDetail queueDetail;
    private final Message<String> message;
    private final RqueueMessage rqueueMessage;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageExecutor(RqueueMessage rqueueMessage, QueueDetail queueDetail, WeakReference<RqueueMessageListenerContainer> weakReference) {
        super(weakReference);
        this.rqueueMessage = rqueueMessage;
        this.queueDetail = queueDetail;
        this.message = new GenericMessage(rqueueMessage.getMessage(), QueueUtils.getQueueHeaders(queueDetail.getQueueName()));
    }

    private int getMaxRetryCount() {
        int numRetries = this.rqueueMessage.getRetryCount() == null ? this.queueDetail.getNumRetries() : this.rqueueMessage.getRetryCount().intValue();
        if (numRetries == -1 && !this.queueDetail.isDlqSet()) {
            numRetries = MAX_RETRY_COUNT;
        }
        return numRetries;
    }

    private Object getPayload() {
        return MessageUtils.convertMessageToObject(this.message, getMessageConverters());
    }

    private void callMessageProcessor(boolean z, RqueueMessage rqueueMessage) {
        MessageProcessor discardMessageProcessor = z ? this.container.get().getDiscardMessageProcessor() : this.container.get().getDlqMessageProcessor();
        try {
            getLogger().debug("Calling {} processor for {}", z ? "Discard Message Queue" : "Dead Letter Queue", rqueueMessage);
            discardMessageProcessor.process(getPayload());
        } catch (Exception e) {
            getLogger().error("Message processor call failed", e);
        }
    }

    private void updateCounter(boolean z) {
        RqueueCounter rqueueCounter = this.container.get().rqueueCounter;
        if (rqueueCounter == null) {
            return;
        }
        if (z) {
            rqueueCounter.updateFailureCount(this.queueDetail.getQueueName());
        } else {
            rqueueCounter.updateExecutionCount(this.queueDetail.getQueueName());
        }
    }

    private void handlePostProcessing(boolean z, int i, int i2) {
        if (isQueueActive(this.queueDetail.getQueueName())) {
            try {
                String processingQueueName = QueueUtils.getProcessingQueueName(this.queueDetail.getQueueName());
                if (z) {
                    getLogger().debug("Delete Queue: {} message: {}", processingQueueName, this.rqueueMessage);
                    getRqueueMessageTemplate().removeFromZset(processingQueueName, this.rqueueMessage);
                } else if (this.queueDetail.isDlqSet()) {
                    RqueueMessage m3clone = this.rqueueMessage.m3clone();
                    m3clone.setFailureCount(i);
                    m3clone.updateReEnqueuedAt();
                    callMessageProcessor(false, m3clone);
                    getRqueueMessageTemplate().add(this.queueDetail.getDlqName(), m3clone);
                    getRqueueMessageTemplate().removeFromZset(processingQueueName, this.rqueueMessage);
                } else if (i < i2) {
                    RqueueMessage m3clone2 = this.rqueueMessage.m3clone();
                    m3clone2.setFailureCount(i);
                    m3clone2.updateReEnqueuedAt();
                    getRqueueMessageTemplate().replaceMessage(processingQueueName, this.rqueueMessage, m3clone2);
                } else {
                    getLogger().warn("Message {} discarded due to retry limit queue: {}", getPayload(), this.queueDetail.getQueueName());
                    getRqueueMessageTemplate().removeFromZset(processingQueueName, this.rqueueMessage);
                    callMessageProcessor(true, this.rqueueMessage);
                }
            } catch (Exception e) {
                getLogger().error("Error occurred in post processing", e);
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z = false;
        int failureCount = this.rqueueMessage.getFailureCount();
        int maxRetryCount = getMaxRetryCount();
        long maxProcessingTime = getMaxProcessingTime();
        while (isQueueActive(this.queueDetail.getQueueName())) {
            try {
                updateCounter(false);
                getMessageHandler().handleMessage(this.message);
                z = true;
            } catch (Exception e) {
                updateCounter(true);
                failureCount++;
            }
            if (failureCount >= maxRetryCount || z || System.currentTimeMillis() >= maxProcessingTime) {
                handlePostProcessing(z, failureCount, maxRetryCount);
                return;
            }
        }
    }

    private long getMaxProcessingTime() {
        return (System.currentTimeMillis() + this.queueDetail.getMaxJobExecutionTime()) - 500;
    }
}
