/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.ClusterTopologyProvider;
import org.springframework.data.redis.connection.ReactiveClusterCommands;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.lettuce.LettuceClusterTopologyProvider;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterGeoCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterHashCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterHyperLogLogCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterKeyCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterListCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterNumberCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterScriptingCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterServerCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterSetCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterStringCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveClusterZSetCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

class LettuceReactiveRedisClusterConnection
extends LettuceReactiveRedisConnection
implements ReactiveRedisClusterConnection {
    private final ClusterTopologyProvider topologyProvider;

    LettuceReactiveRedisClusterConnection(LettuceConnectionProvider connectionProvider, RedisClusterClient client) {
        super(connectionProvider);
        this.topologyProvider = new LettuceClusterTopologyProvider(client);
    }

    LettuceReactiveRedisClusterConnection(StatefulConnection<ByteBuffer, ByteBuffer> sharedConnection, LettuceConnectionProvider connectionProvider, RedisClusterClient client) {
        super(sharedConnection, connectionProvider);
        this.topologyProvider = new LettuceClusterTopologyProvider(client);
    }

    @Override
    public LettuceReactiveClusterKeyCommands keyCommands() {
        return new LettuceReactiveClusterKeyCommands(this);
    }

    @Override
    public LettuceReactiveClusterListCommands listCommands() {
        return new LettuceReactiveClusterListCommands(this);
    }

    @Override
    public LettuceReactiveClusterSetCommands setCommands() {
        return new LettuceReactiveClusterSetCommands(this);
    }

    @Override
    public LettuceReactiveClusterZSetCommands zSetCommands() {
        return new LettuceReactiveClusterZSetCommands(this);
    }

    @Override
    public LettuceReactiveClusterHyperLogLogCommands hyperLogLogCommands() {
        return new LettuceReactiveClusterHyperLogLogCommands(this);
    }

    @Override
    public LettuceReactiveClusterStringCommands stringCommands() {
        return new LettuceReactiveClusterStringCommands(this);
    }

    @Override
    public LettuceReactiveClusterGeoCommands geoCommands() {
        return new LettuceReactiveClusterGeoCommands(this);
    }

    @Override
    public LettuceReactiveClusterHashCommands hashCommands() {
        return new LettuceReactiveClusterHashCommands(this);
    }

    @Override
    public LettuceReactiveClusterNumberCommands numberCommands() {
        return new LettuceReactiveClusterNumberCommands(this);
    }

    @Override
    public LettuceReactiveClusterScriptingCommands scriptingCommands() {
        return new LettuceReactiveClusterScriptingCommands(this);
    }

    @Override
    public LettuceReactiveClusterServerCommands serverCommands() {
        return new LettuceReactiveClusterServerCommands(this, this.topologyProvider);
    }

    @Override
    public LettuceReactiveClusterStreamCommands streamCommands() {
        return new LettuceReactiveClusterStreamCommands(this);
    }

    @Override
    public Mono<String> ping() {
        return this.clusterGetNodes().flatMap(node -> this.execute((RedisNode)node, BaseRedisReactiveCommands::ping)).last();
    }

    @Override
    public Mono<String> ping(RedisClusterNode node) {
        return this.execute(node, BaseRedisReactiveCommands::ping).next();
    }

    @Override
    public Flux<RedisClusterNode> clusterGetNodes() {
        return Flux.fromStream(() -> this.doGetActiveNodes().stream());
    }

    @Override
    public Flux<RedisClusterNode> clusterGetReplicas(RedisClusterNode master) {
        Assert.notNull((Object)master, (String)"Master must not be null");
        return Mono.fromSupplier(() -> this.lookup(master)).flatMapMany(nodeToUse -> this.execute((RedisNode)nodeToUse, cmd -> cmd.clusterSlaves(nodeToUse.getId()).flatMapIterable(Converters::toSetOfRedisClusterNodes)));
    }

    @Override
    public Mono<Map<RedisClusterNode, Collection<RedisClusterNode>>> clusterGetMasterReplicaMap() {
        return Flux.fromStream(() -> this.topologyProvider.getTopology().getActiveMasterNodes().stream()).flatMap(node -> Mono.just((Object)node).zipWith(this.execute((RedisNode)node, cmd -> cmd.clusterSlaves(node.getId())).collectList().map(Converters::toSetOfRedisClusterNodes))).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
    }

    @Override
    public Mono<Integer> clusterGetSlotForKey(ByteBuffer key) {
        return Mono.fromSupplier(() -> SlotHash.getSlot((ByteBuffer)key));
    }

    @Override
    public Mono<RedisClusterNode> clusterGetNodeForSlot(int slot) {
        Set<RedisClusterNode> nodes = this.topologyProvider.getTopology().getSlotServingNodes(slot);
        return nodes.isEmpty() ? Mono.empty() : Flux.fromIterable(nodes).next();
    }

    @Override
    public Mono<RedisClusterNode> clusterGetNodeForKey(ByteBuffer key) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        return this.clusterGetSlotForKey(key).flatMap(this::clusterGetNodeForSlot);
    }

    @Override
    public Mono<ClusterInfo> clusterGetClusterInfo() {
        return this.executeCommandOnArbitraryNode(RedisClusterReactiveCommands::clusterInfo).map(Converters::toProperties).map(ClusterInfo::new).single();
    }

    @Override
    public Mono<Void> clusterAddSlots(RedisClusterNode node, int ... slots) {
        return this.execute(node, cmd -> cmd.clusterAddSlots(slots)).then();
    }

    @Override
    public Mono<Void> clusterAddSlots(RedisClusterNode node, RedisClusterNode.SlotRange range) {
        Assert.notNull((Object)range, (String)"Range must not be null");
        return this.execute(node, cmd -> cmd.clusterAddSlots(range.getSlotsArray())).then();
    }

    @Override
    public Mono<Long> clusterCountKeysInSlot(int slot) {
        return this.execute(cmd -> cmd.clusterCountKeysInSlot(slot)).next();
    }

    @Override
    public Mono<Void> clusterDeleteSlots(RedisClusterNode node, int ... slots) {
        return this.execute(node, cmd -> cmd.clusterDelSlots(slots)).then();
    }

    @Override
    public Mono<Void> clusterDeleteSlotsInRange(RedisClusterNode node, RedisClusterNode.SlotRange range) {
        Assert.notNull((Object)range, (String)"Range must not be null");
        return this.execute(node, cmd -> cmd.clusterDelSlots(range.getSlotsArray())).then();
    }

    @Override
    public Mono<Void> clusterForget(RedisClusterNode node) {
        RedisClusterNode nodeToRemove = this.lookup(node);
        return Flux.fromStream(() -> {
            ArrayList<RedisClusterNode> nodes = new ArrayList<RedisClusterNode>(this.doGetActiveNodes());
            nodes.remove(nodeToRemove);
            return nodes.stream();
        }).flatMap(actualNode -> this.execute(node, cmd -> cmd.clusterForget(nodeToRemove.getId()))).then();
    }

    @Override
    public Mono<Void> clusterMeet(RedisClusterNode node) {
        Assert.notNull((Object)node, (String)"Cluster node must not be null for CLUSTER MEET command");
        Assert.hasText((String)node.getHost(), (String)"Node to meet cluster must have a host");
        Assert.isTrue((node.getPort() != null && node.getPort() > 0 ? 1 : 0) != 0, (String)"Node to meet cluster must have a port greater 0");
        return this.clusterGetNodes().flatMap(actualNode -> this.execute(node, cmd -> cmd.clusterMeet(node.getHost(), node.getPort().intValue()))).then();
    }

    @Override
    public Mono<Void> clusterSetSlot(RedisClusterNode node, int slot, ReactiveClusterCommands.AddSlots mode) {
        Assert.notNull((Object)node, (String)"Node must not be null");
        Assert.notNull((Object)((Object)mode), (String)"AddSlots mode must not be null");
        return this.execute(node, cmd -> {
            RedisClusterNode nodeToUse = this.lookup(node);
            String nodeId = nodeToUse.getId();
            switch (mode) {
                case MIGRATING: {
                    return cmd.clusterSetSlotMigrating(slot, nodeId);
                }
                case IMPORTING: {
                    return cmd.clusterSetSlotImporting(slot, nodeId);
                }
                case NODE: {
                    return cmd.clusterSetSlotNode(slot, nodeId);
                }
                case STABLE: {
                    return cmd.clusterSetSlotStable(slot);
                }
            }
            throw new InvalidDataAccessApiUsageException("Invalid import mode for cluster slot: " + slot);
        }).then();
    }

    @Override
    public Flux<ByteBuffer> clusterGetKeysInSlot(int slot, int count) {
        return this.execute(cmd -> cmd.clusterGetKeysInSlot(slot, count));
    }

    @Override
    public Mono<Void> clusterReplicate(RedisClusterNode master, RedisClusterNode replica) {
        return this.execute(replica, cmd -> cmd.clusterReplicate(this.lookup(master).getId())).then();
    }

    public <T> Flux<T> executeCommandOnArbitraryNode(LettuceReactiveRedisConnection.LettuceReactiveCallback<T> callback) {
        Assert.notNull(callback, (String)"ReactiveCallback must not be null");
        return Mono.fromSupplier(() -> {
            ArrayList<RedisClusterNode> nodes = new ArrayList<RedisClusterNode>(this.doGetActiveNodes());
            int random = new Random().nextInt(nodes.size());
            return (RedisClusterNode)nodes.get(random);
        }).flatMapMany(it -> this.execute((RedisNode)it, callback));
    }

    public <T> Flux<T> execute(RedisNode node, LettuceReactiveRedisConnection.LettuceReactiveCallback<T> callback) {
        Assert.notNull((Object)node, (String)"RedisClusterNode must not be null");
        Assert.notNull(callback, (String)"ReactiveCallback must not be null");
        return this.getCommands(node).flatMapMany(callback::doWithCommands).onErrorMap(this.translateException());
    }

    protected Mono<StatefulRedisClusterConnection<ByteBuffer, ByteBuffer>> getConnection() {
        return super.getConnection();
    }

    protected Mono<RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> getCommands() {
        return this.getConnection().map(StatefulRedisClusterConnection::reactive);
    }

    protected Mono<RedisReactiveCommands<ByteBuffer, ByteBuffer>> getCommands(RedisNode node) {
        if (StringUtils.hasText((String)node.getId())) {
            return this.getConnection().cast(StatefulRedisClusterConnection.class).flatMap(it -> {
                StatefulRedisClusterConnection connection = it;
                return Mono.fromCompletionStage((CompletionStage)connection.getConnectionAsync(node.getId())).map(StatefulRedisConnection::reactive);
            });
        }
        return this.getConnection().flatMap(it -> Mono.fromCompletionStage((CompletionStage)it.getConnectionAsync(node.getHost(), node.getPort().intValue())).map(StatefulRedisConnection::reactive));
    }

    private RedisClusterNode lookup(RedisClusterNode nodeToLookup) {
        return this.topologyProvider.getTopology().lookup(nodeToLookup);
    }

    private Set<RedisClusterNode> doGetActiveNodes() {
        return this.topologyProvider.getTopology().getActiveNodes();
    }
}

