/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.ConnectionFuture;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.cluster.AbstractClusterNodeConnectionFactory;
import com.lambdaworks.redis.cluster.ClusterDistributionChannelWriter;
import com.lambdaworks.redis.cluster.ClusterNodeConnectionFactory;
import com.lambdaworks.redis.cluster.PooledClusterConnectionProvider;
import com.lambdaworks.redis.cluster.RedisClusterClient;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.cluster.pubsub.RedisClusterPubSubListener;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnection;
import com.lambdaworks.redis.resource.ClientResources;

class ClusterPubSubConnectionProvider<K, V>
extends PooledClusterConnectionProvider<K, V> {
    private final RedisClusterClient redisClusterClient;
    private final RedisCodec<K, V> redisCodec;
    private final RedisClusterPubSubListener<K, V> notifications;

    public ClusterPubSubConnectionProvider(RedisClusterClient redisClusterClient, ClusterDistributionChannelWriter<K, V> clusterWriter, RedisCodec<K, V> redisCodec, RedisClusterPubSubListener<K, V> notificationTarget) {
        super(redisClusterClient, clusterWriter, redisCodec);
        this.redisClusterClient = redisClusterClient;
        this.redisCodec = redisCodec;
        this.notifications = notificationTarget;
    }

    @Override
    protected ClusterNodeConnectionFactory<K, V> getConnectionFactory(RedisClusterClient redisClusterClient) {
        return new DecoratingClusterNodeConnectionFactory(new PubSubNodeConnectionFactory(redisClusterClient.getResources()));
    }

    private class DelegatingRedisClusterPubSubListener
    extends RedisPubSubAdapter<K, V> {
        private final String nodeId;
        private final String host;
        private final int port;

        public DelegatingRedisClusterPubSubListener(String nodeId) {
            this.nodeId = nodeId;
            this.host = null;
            this.port = 0;
        }

        public DelegatingRedisClusterPubSubListener(String host, int port) {
            this.nodeId = null;
            this.host = host;
            this.port = port;
        }

        @Override
        public void message(K channel, V message) {
            ClusterPubSubConnectionProvider.this.notifications.message(this.getNode(), channel, message);
        }

        @Override
        public void message(K pattern, K channel, V message) {
            ClusterPubSubConnectionProvider.this.notifications.message(this.getNode(), pattern, channel, message);
        }

        @Override
        public void subscribed(K channel, long count) {
            ClusterPubSubConnectionProvider.this.notifications.subscribed(this.getNode(), channel, count);
        }

        @Override
        public void psubscribed(K pattern, long count) {
            ClusterPubSubConnectionProvider.this.notifications.psubscribed(this.getNode(), pattern, count);
        }

        @Override
        public void unsubscribed(K channel, long count) {
            ClusterPubSubConnectionProvider.this.notifications.unsubscribed(this.getNode(), channel, count);
        }

        @Override
        public void punsubscribed(K pattern, long count) {
            ClusterPubSubConnectionProvider.this.notifications.punsubscribed(this.getNode(), pattern, count);
        }

        private RedisClusterNode getNode() {
            return this.nodeId != null ? ClusterPubSubConnectionProvider.this.getPartitions().getPartitionByNodeId(this.nodeId) : ClusterPubSubConnectionProvider.this.getPartition(this.host, this.port);
        }
    }

    private class DecoratingClusterNodeConnectionFactory
    implements ClusterNodeConnectionFactory<K, V> {
        private final ClusterNodeConnectionFactory<K, V> delegate;

        public DecoratingClusterNodeConnectionFactory(ClusterNodeConnectionFactory<K, V> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void setPartitions(Partitions partitions) {
            this.delegate.setPartitions(partitions);
        }

        @Override
        public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ClusterNodeConnectionFactory.ConnectionKey key) {
            ConnectionFuture future = (ConnectionFuture)this.delegate.apply(key);
            if (key.nodeId != null) {
                return future.thenApply(connection -> {
                    ((StatefulRedisPubSubConnection)connection).addListener(new DelegatingRedisClusterPubSubListener(key.nodeId));
                    return connection;
                });
            }
            return future.thenApply(connection -> {
                ((StatefulRedisPubSubConnection)connection).addListener(new DelegatingRedisClusterPubSubListener(key.host, key.port));
                return connection;
            });
        }
    }

    private class PubSubNodeConnectionFactory
    extends AbstractClusterNodeConnectionFactory<K, V> {
        public PubSubNodeConnectionFactory(ClientResources clientResources) {
            super(clientResources);
        }

        @Override
        public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ClusterNodeConnectionFactory.ConnectionKey key) {
            if (key.nodeId != null) {
                return ClusterPubSubConnectionProvider.this.redisClusterClient.connectPubSubToNodeAsync(ClusterPubSubConnectionProvider.this.redisCodec, key.nodeId, this.getSocketAddressSupplier(key));
            }
            return ClusterPubSubConnectionProvider.this.redisClusterClient.connectPubSubToNodeAsync(ClusterPubSubConnectionProvider.this.redisCodec, key.host + ":" + key.port, this.getSocketAddressSupplier(key));
        }
    }
}

