package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.converter.GenericMessageConverter;
import com.github.sonus21.rqueue.converter.RqueueRedisSerializer;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.models.enums.PubSubType;
import com.github.sonus21.rqueue.models.event.RqueuePubSubEvent;
import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.SerializationUtils;
import com.github.sonus21.rqueue.utils.StringUtils;
import java.time.Duration;
import java.util.UUID;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;

/* loaded from: input_file:com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.class */
public class RqueueInternalPubSubChannel implements InitializingBean {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueInternalPubSubChannel.class);
    private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
    private final RqueueMessageListenerContainer rqueueMessageListenerContainer;
    private final RqueueConfig rqueueConfig;
    private final RqueueRedisTemplate<String> stringRqueueRedisTemplate;
    private final RqueueRedisSerializer rqueueRedisSerializer = new RqueueRedisSerializer();
    private final RqueueBeanProvider rqueueBeanProvider;
    private GenericMessageConverter.SmartMessageSerDes smartMessageSerDes;

    /* loaded from: input_file:com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel$InternalMessageListener.class */
    class InternalMessageListener implements MessageListener {
        InternalMessageListener() {
        }

        public void onMessage(Message message, byte[] bArr) {
            byte[] body = message.getBody();
            if (SerializationUtils.isEmpty(body)) {
                RqueueInternalPubSubChannel.log.error("Empty message received on channel: {}, pattern: {}", new String(message.getChannel()), new String(bArr));
            } else {
                processEvent(body);
            }
        }

        private void processEvent(byte[] bArr) {
            RqueueInternalPubSubChannel.log.debug("Message on internal channel {}", new String(bArr));
            RqueuePubSubEvent rqueuePubSubEvent = (RqueuePubSubEvent) RqueueInternalPubSubChannel.this.smartMessageSerDes.deserialize(bArr, RqueuePubSubEvent.class);
            if (rqueuePubSubEvent == null) {
                RqueueInternalPubSubChannel.log.error("Invalid message on pub-sub channel {}", new String(bArr));
                return;
            }
            switch (rqueuePubSubEvent.getType()) {
                case PAUSE_QUEUE:
                    handlePauseEvent((PauseUnpauseQueueRequest) rqueuePubSubEvent.messageAs(RqueueInternalPubSubChannel.this.smartMessageSerDes, PauseUnpauseQueueRequest.class));
                    return;
                case QUEUE_CRUD:
                    RqueueInternalPubSubChannel.this.rqueueBeanProvider.getRqueueSystemConfigDao().clearCacheByName((String) rqueuePubSubEvent.messageAs(RqueueInternalPubSubChannel.this.smartMessageSerDes, String.class));
                    return;
                default:
                    RqueueInternalPubSubChannel.log.error("Unknown event type {}", rqueuePubSubEvent);
                    return;
            }
        }

        private void handlePauseEvent(PauseUnpauseQueueRequest pauseUnpauseQueueRequest) {
            if (pauseUnpauseQueueRequest == null || StringUtils.isEmpty(pauseUnpauseQueueRequest.getName())) {
                RqueueInternalPubSubChannel.log.error("Invalid message payload {}", pauseUnpauseQueueRequest);
                return;
            }
            String queueCrudLockKey = Constants.getQueueCrudLockKey(RqueueInternalPubSubChannel.this.rqueueConfig, pauseUnpauseQueueRequest.getName());
            String uuid = UUID.randomUUID().toString();
            try {
                if (RqueueInternalPubSubChannel.this.rqueueBeanProvider.getRqueueLockManager().acquireLock(queueCrudLockKey, uuid, Duration.ofMillis(100L))) {
                    RqueueInternalPubSubChannel.this.rqueueMessageListenerContainer.pauseUnpauseQueue(pauseUnpauseQueueRequest.getName(), pauseUnpauseQueueRequest.isPause());
                }
            } finally {
                RqueueInternalPubSubChannel.this.rqueueBeanProvider.getRqueueLockManager().releaseLock(queueCrudLockKey, uuid);
            }
        }
    }

    public RqueueInternalPubSubChannel(RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, RqueueMessageListenerContainer rqueueMessageListenerContainer, RqueueConfig rqueueConfig, RqueueRedisTemplate<String> rqueueRedisTemplate, RqueueBeanProvider rqueueBeanProvider) {
        this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
        this.rqueueMessageListenerContainer = rqueueMessageListenerContainer;
        this.rqueueConfig = rqueueConfig;
        this.stringRqueueRedisTemplate = rqueueRedisTemplate;
        this.rqueueBeanProvider = rqueueBeanProvider;
    }

    public void afterPropertiesSet() throws Exception {
        String internalCommChannelName = this.rqueueConfig.getInternalCommChannelName();
        this.rqueueRedisListenerContainerFactory.addMessageListener(new InternalMessageListener(), new ChannelTopic(internalCommChannelName));
        this.smartMessageSerDes = new GenericMessageConverter.SmartMessageSerDes(SerializationUtils.createObjectMapper());
    }

    public void emitPauseUnpauseQueueEvent(PauseUnpauseQueueRequest pauseUnpauseQueueRequest) {
        publish(PubSubType.PAUSE_QUEUE, pauseUnpauseQueueRequest);
    }

    private void publish(PubSubType pubSubType, Object obj) {
        this.stringRqueueRedisTemplate.getRedisTemplate().convertAndSend(this.rqueueConfig.getInternalCommChannelName(), new RqueuePubSubEvent(pubSubType, RqueueConfig.getBrokerId(), new String(this.rqueueRedisSerializer.serialize(obj))));
    }

    public void emitQueueConfigUpdateEvent(PauseUnpauseQueueRequest pauseUnpauseQueueRequest) {
        publish(PubSubType.QUEUE_CRUD, pauseUnpauseQueueRequest.getName());
    }
}
