/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.failover;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.annotations.Experimental;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.failover.DatabaseConnectionFactory;
import io.lettuce.core.failover.RedisDatabase;
import io.lettuce.core.failover.StatefulRedisMultiDbConnectionImpl;
import io.lettuce.core.failover.api.StatefulRedisMultiDbPubSubConnection;
import io.lettuce.core.failover.health.HealthStatusManager;
import io.lettuce.core.json.JsonParser;
import io.lettuce.core.pubsub.PubSubEndpoint;
import io.lettuce.core.pubsub.RedisPubSubAsyncCommandsImpl;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;

@Experimental
public class StatefulRedisMultiDbPubSubConnectionImpl<K, V>
extends StatefulRedisMultiDbConnectionImpl<StatefulRedisPubSubConnection<K, V>, K, V>
implements StatefulRedisMultiDbPubSubConnection<K, V> {
    private final Set<RedisPubSubListener<K, V>> pubSubListeners = ConcurrentHashMap.newKeySet();

    public StatefulRedisMultiDbPubSubConnectionImpl(Map<RedisURI, RedisDatabase<StatefulRedisPubSubConnection<K, V>>> connections, ClientResources resources, RedisCodec<K, V> codec, Supplier<JsonParser> parser, DatabaseConnectionFactory<StatefulRedisPubSubConnection<K, V>, K, V> connectionFactory, HealthStatusManager healthStatusManager) {
        super(connections, resources, codec, parser, connectionFactory, healthStatusManager);
    }

    @Override
    public void addListener(RedisPubSubListener<K, V> listener) {
        this.doBySharedLock(() -> {
            this.pubSubListeners.add(listener);
            ((StatefulRedisPubSubConnection)this.current.getConnection()).addListener(listener);
        });
    }

    @Override
    public void removeListener(RedisPubSubListener<K, V> listener) {
        this.doBySharedLock(() -> {
            this.pubSubListeners.remove(listener);
            ((StatefulRedisPubSubConnection)this.current.getConnection()).removeListener(listener);
        });
    }

    @Override
    public RedisPubSubAsyncCommands<K, V> async() {
        return (RedisPubSubAsyncCommands)((Object)this.async);
    }

    @Override
    protected RedisPubSubAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
        return new RedisPubSubAsyncCommandsImpl(this, this.codec);
    }

    @Override
    public RedisPubSubCommands<K, V> sync() {
        return (RedisPubSubCommands)this.sync;
    }

    @Override
    protected RedisPubSubCommands<K, V> newRedisSyncCommandsImpl() {
        return (RedisPubSubCommands)this.syncHandler(this.async(), RedisPubSubCommands.class);
    }

    @Override
    public RedisPubSubReactiveCommands<K, V> reactive() {
        return (RedisPubSubReactiveCommands)((Object)this.reactive);
    }

    @Override
    protected RedisPubSubReactiveCommandsImpl<K, V> newRedisReactiveCommandsImpl() {
        return new RedisPubSubReactiveCommandsImpl(this, this.codec);
    }

    @Override
    public void switchToDatabase(RedisURI redisURI) {
        RedisDatabase fromDb = this.current;
        this.doByExclusiveLock(() -> {
            super.switchToDatabase(redisURI);
            this.pubSubListeners.forEach(listener -> {
                ((StatefulRedisPubSubConnection)this.current.getConnection()).addListener(listener);
                ((StatefulRedisPubSubConnection)fromDb.getConnection()).removeListener(listener);
            });
            this.moveSubscriptions(fromDb, this.current);
        });
    }

    public void moveSubscriptions(RedisDatabase<StatefulRedisPubSubConnection<K, V>> fromDb, RedisDatabase<StatefulRedisPubSubConnection<K, V>> toDb) {
        PubSubEndpoint fromEndpoint = (PubSubEndpoint)((Object)fromDb.getDatabaseEndpoint());
        StatefulRedisPubSubConnection<K, V> fromConn = fromDb.getConnection();
        if (fromEndpoint.hasChannelSubscriptions()) {
            K[] channels = this.toArray(fromEndpoint.getChannels());
            this.moveSubscriptions(channels, ((RedisPubSubAsyncCommands)this.async())::subscribe, ((RedisPubSubAsyncCommands)fromConn.async())::unsubscribe);
        }
        if (fromEndpoint.hasShardChannelSubscriptions()) {
            K[] shardChannels = this.toArray(fromEndpoint.getShardChannels());
            this.moveSubscriptions(shardChannels, ((RedisPubSubAsyncCommands)this.async())::ssubscribe, ((RedisPubSubAsyncCommands)fromConn.async())::sunsubscribe);
        }
        if (fromEndpoint.hasPatternSubscriptions()) {
            K[] patterns = this.toArray(fromEndpoint.getPatterns());
            this.moveSubscriptions(patterns, ((RedisPubSubAsyncCommands)this.async())::psubscribe, ((RedisPubSubAsyncCommands)fromConn.async())::punsubscribe);
        }
    }

    private void moveSubscriptions(K[] channels, Function<K[], RedisFuture<Void>> subscribeFunc, Function<K[], RedisFuture<Void>> unsubscribeFunc) {
        RedisFuture<Void> subscribeFuture = subscribeFunc.apply(channels);
        this.handlePubSubCommandError(subscribeFuture, "Re-subscribe failed: ");
        RedisFuture<Void> unsubscribeFuture = unsubscribeFunc.apply(channels);
        this.handlePubSubCommandError(unsubscribeFuture, "Unsubscribe from old endpoint failed (best effort): ");
    }

    private void handlePubSubCommandError(RedisFuture<Void> future, String message) {
        future.exceptionally(throwable -> {
            InternalLoggerFactory.getInstance(this.getClass()).warn(message + future.getError());
            return null;
        });
    }

    private <T> T[] toArray(Collection<T> c) {
        Class<?> cls = c.iterator().next().getClass();
        Object[] array = (Object[])Array.newInstance(cls, c.size());
        return c.toArray(array);
    }
}

