package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.metrics.RqueueCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.TaskStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.MessageUtils;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

/* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueExecutor.class */
class RqueueExecutor extends MessageContainerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueExecutor.class);
    private final QueueDetail queueDetail;
    private final Message<String> message;
    private final RqueueMessage rqueueMessage;
    private final RqueueMessageHandler rqueueMessageHandler;
    private final RqueueMessageMetadataService rqueueMessageMetadataService;
    private final String messageMetadataId;
    private final Semaphore semaphore;
    private final int retryPerPoll;
    private final TaskExecutionBackOff taskExecutionBackoff;
    private MessageMetadata messageMetadata;
    private Object userMessage;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.github.sonus21.rqueue.listener.RqueueExecutor$1, reason: invalid class name */
    /* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus = new int[TaskStatus.values().length];

        static {
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.DELETED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.MOVED_TO_DLQ.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.DISCARDED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.SUCCESSFUL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RqueueExecutor(RqueueMessage rqueueMessage, QueueDetail queueDetail, Semaphore semaphore, WeakReference<RqueueMessageListenerContainer> weakReference, RqueueMessageHandler rqueueMessageHandler, int i, TaskExecutionBackOff taskExecutionBackOff) {
        super(log, queueDetail.getName(), weakReference);
        this.rqueueMessage = rqueueMessage;
        this.queueDetail = queueDetail;
        this.semaphore = semaphore;
        this.rqueueMessageHandler = rqueueMessageHandler;
        this.messageMetadataId = MessageUtils.getMessageMetaId(rqueueMessage.getId());
        this.retryPerPoll = i;
        this.taskExecutionBackoff = taskExecutionBackOff;
        this.message = new GenericMessage(rqueueMessage.getMessage(), MessageUtils.getMessageHeader(queueDetail.getName()));
        try {
            this.userMessage = MessageUtils.convertMessageToObject(this.message, rqueueMessageHandler.getMessageConverters());
        } catch (Exception e) {
            log(Level.ERROR, "Unable to convert message {}", e, rqueueMessage.getMessage());
        }
        this.rqueueMessageMetadataService = ((RqueueMessageListenerContainer) Objects.requireNonNull(weakReference.get())).getRqueueMessageMetadataService();
    }

    private int getMaxRetryCount() {
        return this.rqueueMessage.getRetryCount() == null ? this.queueDetail.getNumRetry() : this.rqueueMessage.getRetryCount().intValue();
    }

    private void callMessageProcessor(TaskStatus taskStatus, RqueueMessage rqueueMessage) {
        MessageProcessor messageProcessor = null;
        switch (AnonymousClass1.$SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[taskStatus.ordinal()]) {
            case 1:
                messageProcessor = ((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getManualDeletionMessageProcessor();
                break;
            case Constants.DEFAULT_WORKER_COUNT_PER_QUEUE /* 2 */:
                messageProcessor = ((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getDeadLetterQueueMessageProcessor();
                break;
            case Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE /* 3 */:
                messageProcessor = ((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getDiscardMessageProcessor();
                break;
            case 4:
                messageProcessor = ((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getPostExecutionMessageProcessor();
                break;
        }
        if (messageProcessor != null) {
            try {
                log(Level.DEBUG, "Calling {} processor for {}", null, taskStatus, rqueueMessage);
                messageProcessor.process(this.userMessage);
            } catch (Exception e) {
                log(Level.ERROR, "Message processor {} call failed", e, taskStatus);
            }
        }
    }

    private void updateCounter(boolean z) {
        RqueueCounter rqueueCounter = this.container.get().getRqueueCounter();
        if (rqueueCounter == null) {
            return;
        }
        if (z) {
            rqueueCounter.updateFailureCount(this.queueDetail.getName());
        } else {
            rqueueCounter.updateExecutionCount(this.queueDetail.getName());
        }
    }

    private void publishEvent(TaskStatus taskStatus, long j) {
        if (((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getRqueueWebConfig().isCollectListenerStats()) {
            addOrDeleteMetadata(j, false);
            ((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getApplicationEventPublisher().publishEvent(new RqueueExecutionEvent(this.queueDetail, this.rqueueMessage, taskStatus, this.messageMetadata));
        }
    }

    private void addOrDeleteMetadata(long j, boolean z) {
        if (this.messageMetadata == null) {
            this.messageMetadata = this.rqueueMessageMetadataService.get(this.messageMetadataId);
        }
        if (this.messageMetadata == null) {
            this.messageMetadata = new MessageMetadata(this.messageMetadataId, this.rqueueMessage.getId());
            if (!z) {
                this.messageMetadata.addExecutionTime(j);
                return;
            }
        }
        this.messageMetadata.addExecutionTime(j);
        if (z) {
            ((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getRqueueMessageMetadataService().save(this.messageMetadata, Duration.ofSeconds(604800L));
        } else {
            this.rqueueMessageMetadataService.delete(this.messageMetadataId);
        }
    }

    private void deleteMessage(TaskStatus taskStatus, int i, long j) {
        getRqueueMessageTemplate().removeElementFromZset(this.queueDetail.getProcessingQueueName(), this.rqueueMessage);
        this.rqueueMessage.setFailureCount(i);
        callMessageProcessor(taskStatus, this.rqueueMessage);
        publishEvent(taskStatus, j);
    }

    private void moveMessageToDlq(int i, long j) throws CloneNotSupportedException {
        if (isWarningEnabled()) {
            log(Level.WARN, "Message {} Moved to dead letter queue: {}, dead letter queue: {}", null, this.userMessage, this.queueDetail.getName(), this.queueDetail.getDeadLetterQueueName());
        }
        RqueueMessage m6clone = this.rqueueMessage.m6clone();
        m6clone.setFailureCount(i);
        m6clone.updateReEnqueuedAt();
        callMessageProcessor(TaskStatus.MOVED_TO_DLQ, m6clone);
        RedisUtils.executePipeLine(getRqueueMessageTemplate().getTemplate(), (redisConnection, stringRedisSerializer, rqueueRedisSerializer) -> {
            byte[] serialize = rqueueRedisSerializer.serialize(m6clone);
            byte[] serialize2 = rqueueRedisSerializer.serialize(this.rqueueMessage);
            byte[] serialize3 = stringRedisSerializer.serialize(this.queueDetail.getProcessingQueueName());
            redisConnection.rPush(stringRedisSerializer.serialize(this.queueDetail.getDeadLetterQueueName()), (byte[][]) new byte[]{serialize});
            redisConnection.zRem(serialize3, (byte[][]) new byte[]{serialize2});
        });
        publishEvent(TaskStatus.MOVED_TO_DLQ, j);
    }

    private void parkMessageForRetry(int i, long j, long j2) throws CloneNotSupportedException {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message {} will be retried in {}Ms, queue: {}, Redis Queue: {}", null, this.userMessage, Long.valueOf(j2), this.queueDetail.getName(), this.queueDetail.getQueueName());
        }
        RqueueMessage m6clone = this.rqueueMessage.m6clone();
        m6clone.setFailureCount(i);
        m6clone.updateReEnqueuedAt();
        getRqueueMessageTemplate().moveMessage(this.queueDetail.getProcessingQueueName(), this.queueDetail.getDelayedQueueName(), this.rqueueMessage, m6clone, j2);
        addOrDeleteMetadata(j, true);
    }

    private void discardMessage(int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message {} discarded due to retry limit exhaust queue: {}", null, this.userMessage, this.queueDetail.getName());
        }
        deleteMessage(TaskStatus.DISCARDED, i, j);
    }

    private void handleManualDeletion(int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message Deleted manually {} successfully, Queue: {}", null, this.rqueueMessage, this.queueDetail.getName());
        }
        deleteMessage(TaskStatus.DELETED, i, j);
    }

    private void handleSuccessFullExecution(int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message consumed {} successfully, Queue: {}", null, this.rqueueMessage, this.queueDetail.getName());
        }
        deleteMessage(TaskStatus.SUCCESSFUL, i, j);
    }

    private void handleLimitExceededMessage(int i, long j) throws CloneNotSupportedException {
        if (this.queueDetail.isDlqSet()) {
            moveMessageToDlq(i, j);
        } else {
            discardMessage(i, j);
        }
    }

    private void handleFailure(int i, int i2, long j) throws CloneNotSupportedException {
        if (i >= i2) {
            handleLimitExceededMessage(i, j);
            return;
        }
        long nextBackOff = this.taskExecutionBackoff.nextBackOff(this.userMessage, this.rqueueMessage, i);
        if (nextBackOff == -1) {
            handleLimitExceededMessage(i, j);
        } else {
            parkMessageForRetry(i, j, nextBackOff);
        }
    }

    private void handlePostProcessing(boolean z, boolean z2, boolean z3, int i, int i2, long j) {
        if (isQueueActive(this.queueDetail.getName())) {
            try {
                if (z3) {
                    handleIgnoredMessage(i, j);
                } else if (z2) {
                    handleManualDeletion(i, j);
                } else if (z) {
                    handleSuccessFullExecution(i, j);
                } else {
                    handleFailure(i, i2, j);
                }
            } catch (Exception e) {
                log(Level.ERROR, "Error occurred in post processing", e, new Object[0]);
            }
        }
    }

    private void handleIgnoredMessage(int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message {} ignored, Queue: {}", null, this.rqueueMessage, this.queueDetail.getName());
        }
        deleteMessage(TaskStatus.IGNORED, i, j);
    }

    private long getMaxProcessingTime() {
        return (System.currentTimeMillis() + this.queueDetail.getVisibilityTimeout()) - 1000;
    }

    private boolean isMessageDeleted(String str) {
        Assert.notNull(str, "Message id must be present");
        this.messageMetadata = this.rqueueMessageMetadataService.get(this.messageMetadataId);
        if (this.messageMetadata == null) {
            return false;
        }
        return this.messageMetadata.isDeleted();
    }

    private boolean shouldProcess() {
        return ((RqueueMessageListenerContainer) Objects.requireNonNull(this.container.get())).getPreExecutionMessageProcessor().process(this.userMessage);
    }

    private int getRetryCount() {
        int maxRetryCount = getMaxRetryCount();
        return this.retryPerPoll == -1 ? maxRetryCount : Math.min(this.retryPerPoll, maxRetryCount);
    }

    @Override // com.github.sonus21.rqueue.listener.MessageContainerBase
    void start() {
        boolean z = false;
        int failureCount = this.rqueueMessage.getFailureCount();
        int maxRetryCount = getMaxRetryCount();
        long maxProcessingTime = getMaxProcessingTime();
        long currentTimeMillis = System.currentTimeMillis();
        boolean z2 = false;
        boolean z3 = false;
        int retryCount = getRetryCount();
        while (isQueueActive(this.queueDetail.getName())) {
            try {
                if (!shouldProcess()) {
                    z3 = true;
                } else if (isMessageDeleted(this.rqueueMessage.getId())) {
                    z2 = true;
                }
                if (!z3 && !z2) {
                    try {
                        updateCounter(false);
                        this.rqueueMessageHandler.handleMessage(this.message);
                        z = true;
                    } catch (Exception e) {
                        log(Level.ERROR, "Message consumer failed", e, new Object[0]);
                        updateCounter(true);
                        failureCount++;
                    }
                    retryCount--;
                    if (failureCount < maxRetryCount && retryCount > 0 && !z && System.currentTimeMillis() < maxProcessingTime) {
                    }
                }
                handlePostProcessing(z, z2, z3, failureCount, maxRetryCount, currentTimeMillis);
                this.semaphore.release();
                return;
            } finally {
                this.semaphore.release();
            }
        }
    }
}
