package org.springframework.data.redis.listener;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.ConnectionUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.listener.SynchronizingMessageListener;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.FixedBackOff;

/* loaded from: input_file:org/springframework/data/redis/listener/RedisMessageListenerContainer.class */
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
    public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class) + "-";
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
    public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME = 2000;

    @Nullable
    private Executor subscriptionExecutor;

    @Nullable
    private Executor taskExecutor;

    @Nullable
    private RedisConnectionFactory connectionFactory;

    @Nullable
    private String beanName;

    @Nullable
    private ErrorHandler errorHandler;

    @Nullable
    private Subscriber subscriber;
    protected final Log logger = LogFactory.getLog(getClass());
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicReference<State> state = new AtomicReference<>(State.notListening());
    private boolean afterPropertiesSet = false;
    private boolean manageExecutor = false;
    private final Map<ByteArrayWrapper, Collection<MessageListener>> patternMapping = new ConcurrentHashMap();
    private final Map<ByteArrayWrapper, Collection<MessageListener>> channelMapping = new ConcurrentHashMap();
    private final Map<MessageListener, Set<Topic>> listenerTopics = new ConcurrentHashMap();
    private volatile RedisSerializer<String> serializer = RedisSerializer.string();
    private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE);
    private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
    private volatile CompletableFuture<Void> listenFuture = new CompletableFuture<>();
    private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/redis/listener/RedisMessageListenerContainer$BlockingSubscriber.class */
    public class BlockingSubscriber extends Subscriber {
        private final Executor executor;

        BlockingSubscriber(RedisConnectionFactory redisConnectionFactory, Executor executor) {
            super(redisConnectionFactory);
            this.executor = executor;
        }

        @Override // org.springframework.data.redis.listener.RedisMessageListenerContainer.Subscriber
        void doUnsubscribe(RedisConnection redisConnection) {
            closeSubscription(redisConnection);
        }

        @Override // org.springframework.data.redis.listener.RedisMessageListenerContainer.Subscriber
        protected void eventuallyPerformSubscription(RedisConnection redisConnection, BackOffExecution backOffExecution, CompletableFuture<Void> completableFuture, Collection<byte[]> collection, Collection<byte[]> collection2) {
            Collection<byte[]> collection3;
            if (collection.isEmpty() || collection2.isEmpty()) {
                collection3 = collection2;
            } else {
                collection3 = Collections.emptySet();
                addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(collection, Collections.emptySet(), () -> {
                    try {
                        subscribeChannel((byte[][]) collection2.toArray((Object[]) new byte[0]));
                    } catch (Exception e) {
                        RedisMessageListenerContainer.this.handleSubscriptionException(completableFuture, backOffExecution, e);
                    }
                }));
            }
            addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(collection, collection2, () -> {
                completableFuture.complete(null);
            }));
            Collection<byte[]> collection4 = collection3;
            this.executor.execute(() -> {
                try {
                    doSubscribe(redisConnection, collection, collection4);
                    closeConnection();
                    RedisMessageListenerContainer.this.unsubscribeFuture.complete(null);
                } catch (Throwable th) {
                    RedisMessageListenerContainer.this.handleSubscriptionException(completableFuture, backOffExecution, th);
                }
            });
        }
    }

    /* loaded from: input_file:org/springframework/data/redis/listener/RedisMessageListenerContainer$DispatchMessageListener.class */
    private class DispatchMessageListener implements MessageListener, SubscriptionListener {
        private DispatchMessageListener() {
        }

        @Override // org.springframework.data.redis.connection.MessageListener
        public void onMessage(Message message, @Nullable byte[] bArr) {
            Collection<MessageListener> collection;
            if (bArr == null || bArr.length <= 0) {
                bArr = null;
                collection = RedisMessageListenerContainer.this.channelMapping.get(new ByteArrayWrapper(message.getChannel()));
            } else {
                collection = RedisMessageListenerContainer.this.patternMapping.get(new ByteArrayWrapper(bArr));
            }
            if (CollectionUtils.isEmpty(collection)) {
                return;
            }
            RedisMessageListenerContainer.this.dispatchMessage(collection, message, bArr);
        }

        @Override // org.springframework.data.redis.connection.SubscriptionListener
        public void onChannelSubscribed(byte[] bArr, long j) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.channelMapping.getOrDefault(new ByteArrayWrapper(bArr), Collections.emptyList()), bArr, j, (v0, v1, v2) -> {
                v0.onChannelSubscribed(v1, v2);
            });
        }

        @Override // org.springframework.data.redis.connection.SubscriptionListener
        public void onChannelUnsubscribed(byte[] bArr, long j) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.channelMapping.getOrDefault(new ByteArrayWrapper(bArr), Collections.emptyList()), bArr, j, (v0, v1, v2) -> {
                v0.onChannelUnsubscribed(v1, v2);
            });
        }

        @Override // org.springframework.data.redis.connection.SubscriptionListener
        public void onPatternSubscribed(byte[] bArr, long j) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.patternMapping.getOrDefault(new ByteArrayWrapper(bArr), Collections.emptyList()), bArr, j, (v0, v1, v2) -> {
                v0.onPatternSubscribed(v1, v2);
            });
        }

        @Override // org.springframework.data.redis.connection.SubscriptionListener
        public void onPatternUnsubscribed(byte[] bArr, long j) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.patternMapping.getOrDefault(new ByteArrayWrapper(bArr), Collections.emptyList()), bArr, j, (v0, v1, v2) -> {
                v0.onPatternUnsubscribed(v1, v2);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/redis/listener/RedisMessageListenerContainer$State.class */
    public static class State {
        private final boolean prepareListening;
        private final boolean listening;

        private State(boolean z, boolean z2) {
            this.prepareListening = z;
            this.listening = z2;
        }

        static State notListening() {
            return new State(false, false);
        }

        static State prepareListening() {
            return new State(true, false);
        }

        static State listening() {
            return new State(true, true);
        }

        static State prepareUnsubscribe() {
            return new State(false, true);
        }

        private boolean isListenerActivated() {
            return isListening() || isPrepareListening();
        }

        public boolean isListening() {
            return this.listening;
        }

        public boolean isPrepareListening() {
            return this.prepareListening;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/redis/listener/RedisMessageListenerContainer$Subscriber.class */
    public class Subscriber {

        @Nullable
        private volatile RedisConnection connection;
        private final RedisConnectionFactory connectionFactory;
        private final Object localMonitor = new Object();
        private final DispatchMessageListener delegateListener;
        private final SynchronizingMessageListener synchronizingMessageListener;

        Subscriber(RedisConnectionFactory redisConnectionFactory) {
            this.delegateListener = new DispatchMessageListener();
            this.synchronizingMessageListener = new SynchronizingMessageListener(this.delegateListener, this.delegateListener);
            this.connectionFactory = redisConnectionFactory;
        }

        public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Collection<byte[]> collection, Collection<byte[]> collection2) {
            RedisConnection connection;
            synchronized (this.localMonitor) {
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                try {
                    connection = this.connectionFactory.getConnection();
                    this.connection = connection;
                } catch (RuntimeException e) {
                    completableFuture.completeExceptionally(e);
                }
                if (connection.isSubscribed()) {
                    completableFuture.completeExceptionally(new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
                    return completableFuture;
                }
                try {
                    eventuallyPerformSubscription(connection, backOffExecution, completableFuture, collection, collection2);
                } catch (Throwable th) {
                    RedisMessageListenerContainer.this.handleSubscriptionException(completableFuture, backOffExecution, th);
                }
                return completableFuture;
            }
        }

        void eventuallyPerformSubscription(RedisConnection redisConnection, BackOffExecution backOffExecution, CompletableFuture<Void> completableFuture, Collection<byte[]> collection, Collection<byte[]> collection2) {
            addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(collection, collection2, () -> {
                completableFuture.complete(null);
            }));
            doSubscribe(redisConnection, collection, collection2);
        }

        void doSubscribe(RedisConnection redisConnection, Collection<byte[]> collection, Collection<byte[]> collection2) {
            if (!collection.isEmpty()) {
                redisConnection.pSubscribe(this.synchronizingMessageListener, (byte[][]) collection.toArray((Object[]) new byte[0]));
            }
            if (collection2.isEmpty()) {
                return;
            }
            if (collection.isEmpty()) {
                redisConnection.subscribe(this.synchronizingMessageListener, (byte[][]) collection2.toArray((Object[]) new byte[0]));
            } else {
                subscribeChannel((byte[][]) collection2.toArray((Object[]) new byte[0]));
            }
        }

        void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronization subscriptionSynchronization) {
            this.synchronizingMessageListener.addSynchronization(subscriptionSynchronization);
        }

        public void unsubscribeAll() {
            synchronized (this.localMonitor) {
                RedisConnection redisConnection = this.connection;
                if (redisConnection == null) {
                    return;
                }
                doUnsubscribe(redisConnection);
            }
        }

        void doUnsubscribe(RedisConnection redisConnection) {
            closeSubscription(redisConnection);
            closeConnection();
            RedisMessageListenerContainer.this.unsubscribeFuture.complete(null);
        }

        public void cancel() {
            synchronized (this.localMonitor) {
                RedisConnection redisConnection = this.connection;
                if (redisConnection == null) {
                    return;
                }
                doCancel(redisConnection);
            }
        }

        void doCancel(RedisConnection redisConnection) {
            closeSubscription(redisConnection);
            closeConnection();
        }

        void closeSubscription(RedisConnection redisConnection) {
            if (RedisMessageListenerContainer.this.logger.isTraceEnabled()) {
                RedisMessageListenerContainer.this.logger.trace("Cancelling Redis subscription...");
            }
            Subscription subscription = redisConnection.getSubscription();
            if (subscription != null) {
                if (RedisMessageListenerContainer.this.logger.isTraceEnabled()) {
                    RedisMessageListenerContainer.this.logger.trace("Unsubscribing from all channels");
                }
                try {
                    subscription.close();
                } catch (Exception e) {
                    RedisMessageListenerContainer.this.logger.warn("Unable to unsubscribe from subscriptions", e);
                }
            }
        }

        public void closeConnection() {
            synchronized (this.localMonitor) {
                RedisConnection redisConnection = this.connection;
                this.connection = null;
                if (redisConnection != null) {
                    RedisMessageListenerContainer.this.logger.trace("Closing connection");
                    try {
                        redisConnection.close();
                    } catch (Exception e) {
                        RedisMessageListenerContainer.this.logger.warn("Error closing subscription connection", e);
                    }
                }
            }
        }

        public void subscribeChannel(byte[]... bArr) {
            doWithSubscription(bArr, (v0, v1) -> {
                v0.subscribe(v1);
            });
        }

        public void subscribePattern(byte[]... bArr) {
            doWithSubscription(bArr, (v0, v1) -> {
                v0.pSubscribe(v1);
            });
        }

        public void unsubscribeChannel(byte[]... bArr) {
            doWithSubscription(bArr, (v0, v1) -> {
                v0.unsubscribe(v1);
            });
        }

        public void unsubscribePattern(byte[]... bArr) {
            doWithSubscription(bArr, (v0, v1) -> {
                v0.pUnsubscribe(v1);
            });
        }

        private void doWithSubscription(byte[][] bArr, BiConsumer<Subscription, byte[][]> biConsumer) {
            Subscription subscription;
            if (ObjectUtils.isEmpty(bArr)) {
                return;
            }
            synchronized (this.localMonitor) {
                RedisConnection redisConnection = this.connection;
                if (redisConnection != null && (subscription = redisConnection.getSubscription()) != null) {
                    biConsumer.accept(subscription, bArr);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/redis/listener/RedisMessageListenerContainer$SubscriptionConsumer.class */
    public interface SubscriptionConsumer {
        void accept(SubscriptionListener subscriptionListener, byte[] bArr, long j);
    }

    public void afterPropertiesSet() {
        Assert.state(!this.afterPropertiesSet, "Container already initialized");
        if (this.connectionFactory == null) {
            throw new IllegalArgumentException("RedisConnectionFactory is not set");
        }
        if (this.taskExecutor == null) {
            this.manageExecutor = true;
            this.taskExecutor = createDefaultTaskExecutor();
        }
        if (this.subscriptionExecutor == null) {
            this.subscriptionExecutor = this.taskExecutor;
        }
        this.subscriber = createSubscriber(this.connectionFactory, this.subscriptionExecutor);
        this.afterPropertiesSet = true;
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        return new SimpleAsyncTaskExecutor(this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
    }

    public void destroy() throws Exception {
        this.afterPropertiesSet = false;
        stop();
        if (this.manageExecutor && (this.taskExecutor instanceof DisposableBean)) {
            this.taskExecutor.destroy();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Stopped internally-managed task executor");
            }
        }
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Starting RedisMessageListenerContainer...");
            }
            lazyListen();
        }
    }

    private void lazyListen() {
        try {
            (this.state.get().isPrepareListening() ? this.listenFuture : lazyListen(this.backOff.start())).get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof DataAccessException)) {
                throw new CompletionException(e2.getCause());
            }
            throw new RedisListenerExecutionFailedException(e2.getMessage(), e2.getCause());
        } catch (TimeoutException e3) {
            throw new IllegalStateException("Subscription registration timeout exceeded", e3);
        }
    }

    private CompletableFuture<Void> lazyListen(BackOffExecution backOffExecution) {
        if (!hasTopics()) {
            this.logger.debug("Postpone listening for Redis messages until actual listeners are added");
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = this.listenFuture;
        while (true) {
            CompletableFuture<Void> completableFuture2 = completableFuture;
            if (doSubscribe(backOffExecution)) {
                return completableFuture2;
            }
            completableFuture = this.listenFuture;
        }
    }

    private boolean doSubscribe(BackOffExecution backOffExecution) {
        CompletableFuture<Void> completableFuture = this.listenFuture;
        CompletableFuture<Void> completableFuture2 = this.unsubscribeFuture;
        State state = this.state.get();
        if (!state.isPrepareListening() && state.isListening()) {
            completableFuture2.join();
        }
        if (!this.state.compareAndSet(state, State.prepareListening())) {
            return false;
        }
        getRequiredSubscriber().initialize(backOffExecution, (Collection) this.patternMapping.keySet().stream().map((v0) -> {
            return v0.getArray();
        }).collect(Collectors.toList()), (Collection) this.channelMapping.keySet().stream().map((v0) -> {
            return v0.getArray();
        }).collect(Collectors.toList())).whenComplete((r7, th) -> {
            if (th == null) {
                this.logger.debug("RedisMessageListenerContainer listeners registered successfully");
                this.state.set(State.listening());
            } else {
                this.logger.debug("Failed to start RedisMessageListenerContainer listeners", th);
                this.state.set(State.notListening());
            }
            propagate(r7, th, completableFuture);
            if (th != null) {
                this.listenFuture = new CompletableFuture<>();
            }
        });
        this.logger.debug("Subscribing to topics for RedisMessageListenerContainer");
        return true;
    }

    public void stop() {
        stop(() -> {
        });
    }

    public void stop(Runnable runnable) {
        if (this.started.compareAndSet(true, false)) {
            stopListening();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Stopped RedisMessageListenerContainer");
            }
            runnable.run();
        }
    }

    private void stopListening() {
        do {
        } while (!doUnsubscribe());
    }

    private boolean doUnsubscribe() {
        CompletableFuture<Void> completableFuture = this.listenFuture;
        State state = this.state.get();
        if (!state.isListenerActivated()) {
            return true;
        }
        awaitRegistrationTime(completableFuture);
        if (!this.state.compareAndSet(state, State.prepareUnsubscribe())) {
            return false;
        }
        getRequiredSubscriber().unsubscribeAll();
        awaitRegistrationTime(this.unsubscribeFuture);
        this.state.set(State.notListening());
        this.listenFuture = new CompletableFuture<>();
        this.unsubscribeFuture = new CompletableFuture<>();
        if (!this.logger.isDebugEnabled()) {
            return true;
        }
        this.logger.debug("Stopped listening");
        return true;
    }

    private void awaitRegistrationTime(CompletableFuture<Void> completableFuture) {
        try {
            completableFuture.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
        }
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public boolean isListening() {
        return this.state.get().isListening();
    }

    public final boolean isActive() {
        return this.afterPropertiesSet;
    }

    @Nullable
    public RedisConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
        Assert.notNull(redisConnectionFactory, "ConnectionFactory must not be null");
        this.connectionFactory = redisConnectionFactory;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void setSubscriptionExecutor(Executor executor) {
        this.subscriptionExecutor = executor;
    }

    public void setTopicSerializer(RedisSerializer<String> redisSerializer) {
        this.serializer = redisSerializer;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setMessageListeners(Map<? extends MessageListener, Collection<? extends Topic>> map) {
        initMapping(map);
    }

    public void addMessageListener(MessageListener messageListener, Collection<? extends Topic> collection) {
        addListener(messageListener, collection);
    }

    public void addMessageListener(MessageListener messageListener, Topic topic) {
        addMessageListener(messageListener, Collections.singleton(topic));
    }

    public void removeMessageListener(@Nullable MessageListener messageListener, Collection<? extends Topic> collection) {
        removeListener(messageListener, collection);
    }

    public void removeMessageListener(@Nullable MessageListener messageListener, Topic topic) {
        removeMessageListener(messageListener, Collections.singleton(topic));
    }

    public void removeMessageListener(MessageListener messageListener) {
        Assert.notNull(messageListener, "MessageListener must not be null");
        removeMessageListener(messageListener, Collections.emptySet());
    }

    private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> map) {
        if (isRunning()) {
            stop();
        }
        this.patternMapping.clear();
        this.channelMapping.clear();
        this.listenerTopics.clear();
        if (!CollectionUtils.isEmpty(map)) {
            for (Map.Entry<? extends MessageListener, Collection<? extends Topic>> entry : map.entrySet()) {
                addListener(entry.getKey(), entry.getValue());
            }
        }
        if (this.afterPropertiesSet) {
            start();
        }
    }

    private void addListener(MessageListener messageListener, Collection<? extends Topic> collection) {
        Assert.notNull(messageListener, "a valid listener is required");
        Assert.notEmpty(collection, "at least one topic is required");
        ArrayList arrayList = new ArrayList(collection.size());
        ArrayList arrayList2 = new ArrayList(collection.size());
        boolean isTraceEnabled = this.logger.isTraceEnabled();
        Set<Topic> set = this.listenerTopics.get(messageListener);
        if (set == null) {
            set = new CopyOnWriteArraySet();
            this.listenerTopics.put(messageListener, set);
        }
        set.addAll(collection);
        for (Topic topic : collection) {
            ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(serialize(topic));
            if (topic instanceof ChannelTopic) {
                Collection<MessageListener> collection2 = this.channelMapping.get(byteArrayWrapper);
                if (collection2 == null) {
                    collection2 = new CopyOnWriteArraySet();
                    this.channelMapping.put(byteArrayWrapper, collection2);
                }
                collection2.add(messageListener);
                arrayList.add(byteArrayWrapper.getArray());
                if (isTraceEnabled) {
                    this.logger.trace("Adding listener '" + messageListener + "' on channel '" + topic.getTopic() + "'");
                }
            } else {
                if (!(topic instanceof PatternTopic)) {
                    throw new IllegalArgumentException("Unknown topic type '" + topic.getClass() + "'");
                }
                Collection<MessageListener> collection3 = this.patternMapping.get(byteArrayWrapper);
                if (collection3 == null) {
                    collection3 = new CopyOnWriteArraySet();
                    this.patternMapping.put(byteArrayWrapper, collection3);
                }
                collection3.add(messageListener);
                arrayList2.add(byteArrayWrapper.getArray());
                if (isTraceEnabled) {
                    this.logger.trace("Adding listener '" + messageListener + "' for pattern '" + topic.getTopic() + "'");
                }
            }
        }
        boolean isListening = isListening();
        if (isRunning()) {
            lazyListen();
            if (isListening) {
                CompletableFuture completableFuture = new CompletableFuture();
                getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(arrayList2, arrayList, () -> {
                    completableFuture.complete(null);
                }));
                getRequiredSubscriber().subscribeChannel((byte[][]) arrayList.toArray((Object[]) new byte[arrayList.size()]));
                getRequiredSubscriber().subscribePattern((byte[][]) arrayList2.toArray((Object[]) new byte[arrayList2.size()]));
                try {
                    completableFuture.join();
                } catch (CompletionException e) {
                    if (!(e.getCause() instanceof DataAccessException)) {
                        throw e;
                    }
                    throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
                }
            }
        }
    }

    private void removeListener(@Nullable MessageListener messageListener, Collection<? extends Topic> collection) {
        Assert.notNull(collection, "Topics must not be null");
        if (messageListener == null || this.listenerTopics.get(messageListener) != null) {
            if (collection.isEmpty()) {
                collection = this.listenerTopics.get(messageListener);
            }
            boolean isTraceEnabled = this.logger.isTraceEnabled();
            if (CollectionUtils.isEmpty(collection)) {
                stopListening();
                return;
            }
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            if (CollectionUtils.isEmpty(collection)) {
                Set<Topic> remove = this.listenerTopics.remove(messageListener);
                if (remove == null) {
                    return;
                } else {
                    collection = remove;
                }
            }
            for (Topic topic : collection) {
                ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(serialize(topic));
                if (topic instanceof ChannelTopic) {
                    remove(messageListener, topic, byteArrayWrapper, this.channelMapping, arrayList);
                    if (isTraceEnabled) {
                        this.logger.trace("Removing " + ("listener '" + messageListener + "'") + " from channel '" + topic.getTopic() + "'");
                    }
                } else if (topic instanceof PatternTopic) {
                    remove(messageListener, topic, byteArrayWrapper, this.patternMapping, arrayList2);
                    if (isTraceEnabled) {
                        this.logger.trace("Removing " + ("listener '" + messageListener + "'") + " from pattern '" + topic.getTopic() + "'");
                    }
                }
            }
            if (this.listenerTopics.isEmpty()) {
                stopListening();
            } else if (isListening()) {
                getRequiredSubscriber().unsubscribeChannel((byte[][]) arrayList.toArray((Object[]) new byte[arrayList.size()]));
                getRequiredSubscriber().unsubscribePattern((byte[][]) arrayList2.toArray((Object[]) new byte[arrayList2.size()]));
            }
        }
    }

    private void remove(MessageListener messageListener, Topic topic, ByteArrayWrapper byteArrayWrapper, Map<ByteArrayWrapper, Collection<MessageListener>> map, List<byte[]> list) {
        Collection<MessageListener> collection = map.get(byteArrayWrapper);
        if (collection != null) {
            collection.remove(messageListener);
            for (MessageListener messageListener2 : Collections.singletonList(messageListener)) {
                Set<Topic> set = this.listenerTopics.get(messageListener2);
                if (set != null) {
                    set.remove(topic);
                }
                if (CollectionUtils.isEmpty(set)) {
                    this.listenerTopics.remove(messageListener2);
                }
            }
            if (collection.isEmpty()) {
                map.remove(byteArrayWrapper);
                list.add(byteArrayWrapper.getArray());
            }
        }
    }

    public void setRecoveryInterval(long j) {
        setRecoveryBackoff(new FixedBackOff(j, Long.MAX_VALUE));
    }

    public void setRecoveryBackoff(BackOff backOff) {
        Assert.notNull(backOff, "Recovery interval must not be null");
        this.backOff = backOff;
    }

    public long getMaxSubscriptionRegistrationWaitingTime() {
        return this.maxSubscriptionRegistrationWaitingTime;
    }

    public void setMaxSubscriptionRegistrationWaitingTime(long j) {
        this.maxSubscriptionRegistrationWaitingTime = j;
    }

    private Subscriber createSubscriber(RedisConnectionFactory redisConnectionFactory, Executor executor) {
        return ConnectionUtils.isAsync(redisConnectionFactory) ? new Subscriber(redisConnectionFactory) : new BlockingSubscriber(redisConnectionFactory, executor);
    }

    protected void processMessage(MessageListener messageListener, Message message, byte[] bArr) {
        try {
            messageListener.onMessage(message, bArr);
        } catch (Throwable th) {
            handleListenerException(th);
        }
    }

    protected void handleListenerException(Throwable th) {
        if (isActive()) {
            invokeErrorHandler(th);
        } else {
            this.logger.debug("Listener exception after container shutdown", th);
        }
    }

    protected void invokeErrorHandler(Throwable th) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(th);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn("Execution of message listener failed, and no ErrorHandler has been set", th);
        }
    }

    protected void handleSubscriptionException(CompletableFuture<Void> completableFuture, BackOffExecution backOffExecution, Throwable th) {
        getRequiredSubscriber().closeConnection();
        if (!(th instanceof RedisConnectionFailureException) || !isRunning()) {
            if (isRunning()) {
                this.logger.error("SubscriptionTask aborted with exception:", th);
            }
            completableFuture.completeExceptionally(th);
        } else {
            if (potentiallyRecover(() -> {
                long nextBackOff = backOffExecution.nextBackOff();
                if (nextBackOff != -1) {
                    this.logger.error(String.format("Connection failure occurred: %s; Restarting subscription task after %s ms", th, Long.valueOf(nextBackOff)), th);
                }
                return nextBackOff;
            }, () -> {
                lazyListen(backOffExecution).whenComplete(propagate(completableFuture));
            })) {
                return;
            }
            this.logger.error("SubscriptionTask aborted with exception:", th);
            completableFuture.completeExceptionally(new IllegalStateException("Subscription attempts exceeded", th));
        }
    }

    private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable runnable) {
        long nextBackOff = backOffExecution.nextBackOff();
        if (nextBackOff == -1) {
            return false;
        }
        try {
            if (this.subscriptionExecutor instanceof ScheduledExecutorService) {
                ((ScheduledExecutorService) this.subscriptionExecutor).schedule(runnable, nextBackOff, TimeUnit.MILLISECONDS);
                return true;
            }
            Thread.sleep(nextBackOff);
            runnable.run();
            return true;
        } catch (InterruptedException e) {
            this.logger.debug("Thread interrupted while sleeping the recovery interval");
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private <T> BiConsumer<? super T, ? super Throwable> propagate(CompletableFuture<T> completableFuture) {
        return (obj, th) -> {
            propagate(obj, th, completableFuture);
        };
    }

    private <T> void propagate(@Nullable T t, @Nullable Throwable th, CompletableFuture<T> completableFuture) {
        if (th != null) {
            completableFuture.completeExceptionally(th);
        } else {
            completableFuture.complete(t);
        }
    }

    private void dispatchSubscriptionNotification(Collection<MessageListener> collection, byte[] bArr, long j, SubscriptionConsumer subscriptionConsumer) {
        if (CollectionUtils.isEmpty(collection)) {
            return;
        }
        byte[] bArr2 = (byte[]) bArr.clone();
        Executor requiredTaskExecutor = getRequiredTaskExecutor();
        for (MessageListener messageListener : collection) {
            if (messageListener instanceof SubscriptionListener) {
                requiredTaskExecutor.execute(() -> {
                    subscriptionConsumer.accept((SubscriptionListener) messageListener, bArr2, j);
                });
            }
        }
    }

    private void dispatchMessage(Collection<MessageListener> collection, Message message, @Nullable byte[] bArr) {
        byte[] channel = bArr != null ? (byte[]) bArr.clone() : message.getChannel();
        Executor requiredTaskExecutor = getRequiredTaskExecutor();
        for (MessageListener messageListener : collection) {
            requiredTaskExecutor.execute(() -> {
                processMessage(messageListener, message, channel);
            });
        }
    }

    private boolean hasTopics() {
        return (this.channelMapping.isEmpty() && this.patternMapping.isEmpty()) ? false : true;
    }

    private Subscriber getRequiredSubscriber() {
        if (this.subscriber == null) {
            throw new IllegalStateException("Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
        }
        return this.subscriber;
    }

    private Executor getRequiredTaskExecutor() {
        if (this.taskExecutor == null) {
            throw new IllegalStateException("No executor configured");
        }
        return this.taskExecutor;
    }

    private byte[] serialize(Topic topic) {
        return this.serializer.serialize(topic.getTopic());
    }
}
