/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster;

import io.lettuce.core.AbstractRedisAsyncCommands;
import io.lettuce.core.FlushMode;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.KeyValue;
import io.lettuce.core.MSetExArgs;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.StreamScanCursor;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
import io.lettuce.core.api.async.RedisScriptingAsyncCommands;
import io.lettuce.core.api.async.RedisServerAsyncCommands;
import io.lettuce.core.cluster.AbstractNodeSelection;
import io.lettuce.core.cluster.AsyncClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterDistributionChannelWriter;
import io.lettuce.core.cluster.ClusterScanSupport;
import io.lettuce.core.cluster.DynamicNodeSelection;
import io.lettuce.core.cluster.MultiNodeExecution;
import io.lettuce.core.cluster.NodeSelectionInvocationHandler;
import io.lettuce.core.cluster.PipelinedRedisFuture;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.StatefulRedisClusterConnectionImpl;
import io.lettuce.core.cluster.StaticNodeSelection;
import io.lettuce.core.cluster.api.NodeSelectionSupport;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.AsyncNodeSelection;
import io.lettuce.core.cluster.api.async.NodeSelectionAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.json.JsonParser;
import io.lettuce.core.json.JsonPath;
import io.lettuce.core.json.JsonValue;
import io.lettuce.core.json.arguments.JsonMsetArgs;
import io.lettuce.core.output.IntegerOutput;
import io.lettuce.core.output.KeyStreamingChannel;
import io.lettuce.core.output.KeyValueStreamingChannel;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ConnectionIntent;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.search.AggregationReply;
import io.lettuce.core.search.SearchReply;
import io.lettuce.core.search.SpellCheckResult;
import io.lettuce.core.search.arguments.AggregateArgs;
import io.lettuce.core.search.arguments.CreateArgs;
import io.lettuce.core.search.arguments.ExplainArgs;
import io.lettuce.core.search.arguments.FieldArgs;
import io.lettuce.core.search.arguments.SearchArgs;
import io.lettuce.core.search.arguments.SpellCheckArgs;
import io.lettuce.core.search.arguments.SynUpdateArgs;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class RedisAdvancedClusterAsyncCommandsImpl<K, V>
extends AbstractRedisAsyncCommands<K, V>
implements RedisAdvancedClusterAsyncCommands<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisAdvancedClusterAsyncCommandsImpl.class);
    private final RedisCodec<K, V> codec;

    @Deprecated
    public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec, Supplier<JsonParser> parser) {
        super(connection, codec, parser);
        this.codec = codec;
    }

    @Deprecated
    public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
        this.codec = codec;
    }

    public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec, Supplier<JsonParser> parser) {
        super(connection, codec, parser);
        this.codec = codec;
    }

    public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
        this.codec = codec;
    }

    @Override
    public RedisFuture<String> clientSetname(K name) {
        HashMap<String, CompletionStage> executions = new HashMap<String, CompletionStage>();
        CompletableFuture<String> ok = CompletableFuture.completedFuture("OK");
        executions.put("Default", super.clientSetname(name).toCompletableFuture());
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            RedisURI uri = redisClusterNode.getUri();
            CompletableFuture<StatefulRedisConnection<K, V>> byNodeId = this.getStatefulConnection(redisClusterNode.getNodeId());
            executions.put("NodeId: " + redisClusterNode.getNodeId(), byNodeId.thenCompose(c -> {
                if (c.isOpen()) {
                    return c.async().clientSetname(name);
                }
                return ok;
            }));
            CompletableFuture<StatefulRedisConnection<K, V>> byHost = this.getStatefulConnection(uri.getHost(), uri.getPort());
            executions.put("HostAndPort: " + redisClusterNode.getNodeId(), byHost.thenCompose(c -> {
                if (c.isOpen()) {
                    return c.async().clientSetname(name);
                }
                return ok;
            }));
        }
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<Long> clusterCountKeysInSlot(int slot) {
        RedisClusterAsyncCommands<K, V> connectionBySlot = this.findConnectionBySlot(slot);
        if (connectionBySlot != null) {
            return connectionBySlot.clusterCountKeysInSlot(slot);
        }
        return super.clusterCountKeysInSlot(slot);
    }

    @Override
    public RedisFuture<List<K>> clusterGetKeysInSlot(int slot, int count) {
        RedisClusterAsyncCommands<K, V> connectionBySlot = this.findConnectionBySlot(slot);
        if (connectionBySlot != null) {
            return connectionBySlot.clusterGetKeysInSlot(slot, count);
        }
        return super.clusterGetKeysInSlot(slot, count);
    }

    @Override
    public RedisFuture<Long> dbsize() {
        return MultiNodeExecution.aggregateAsync(this.executeOnUpstream(RedisServerAsyncCommands::dbsize));
    }

    @Override
    public RedisFuture<Long> del(K ... keys) {
        return this.del((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> del(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.del(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> del = super.del((Iterable)entry.getValue());
            executions.put(entry.getKey(), del);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<Long> exists(K ... keys) {
        return this.exists((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> exists(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.exists(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> exists = super.exists((Iterable)entry.getValue());
            executions.put(entry.getKey(), exists);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<String> flushall() {
        return MultiNodeExecution.firstOfAsync(this.executeOnUpstream(RedisServerAsyncCommands::flushall));
    }

    @Override
    public RedisFuture<String> flushall(FlushMode flushMode) {
        return MultiNodeExecution.firstOfAsync(this.executeOnUpstream(kvRedisClusterAsyncCommands -> kvRedisClusterAsyncCommands.flushall(flushMode)));
    }

    @Override
    public RedisFuture<String> flushallAsync() {
        return MultiNodeExecution.firstOfAsync(this.executeOnUpstream(RedisServerAsyncCommands::flushallAsync));
    }

    @Override
    public RedisFuture<String> flushdb() {
        return MultiNodeExecution.firstOfAsync(this.executeOnUpstream(RedisServerAsyncCommands::flushdb));
    }

    @Override
    public RedisFuture<String> flushdb(FlushMode flushMode) {
        return MultiNodeExecution.firstOfAsync(this.executeOnUpstream(kvRedisClusterAsyncCommands -> kvRedisClusterAsyncCommands.flushdb(flushMode)));
    }

    @Override
    public RedisFuture<List<K>> keys(String pattern) {
        Map executions = this.executeOnUpstream(commands -> commands.keys(pattern));
        return new PipelinedRedisFuture<List<K>>(executions, objectPipelinedRedisFuture -> {
            ArrayList result = new ArrayList();
            for (CompletableFuture future : executions.values()) {
                result.addAll(MultiNodeExecution.execute(future::get));
            }
            return result;
        });
    }

    @Override
    @Deprecated
    public RedisFuture<List<K>> keysLegacy(K pattern) {
        Map executions = this.executeOnUpstream(commands -> commands.keysLegacy(pattern));
        return new PipelinedRedisFuture<List<K>>(executions, objectPipelinedRedisFuture -> {
            ArrayList result = new ArrayList();
            for (CompletableFuture future : executions.values()) {
                result.addAll(MultiNodeExecution.execute(future::get));
            }
            return result;
        });
    }

    @Override
    public RedisFuture<Long> keys(KeyStreamingChannel<K> channel, String pattern) {
        Map executions = this.executeOnUpstream(commands -> commands.keys(channel, pattern));
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    @Deprecated
    public RedisFuture<Long> keysLegacy(KeyStreamingChannel<K> channel, K pattern) {
        Map executions = this.executeOnUpstream(commands -> commands.keysLegacy(channel, pattern));
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<List<JsonValue>> jsonMGet(JsonPath jsonPath, K ... keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, Arrays.asList(keys));
        if (partitioned.size() < 2) {
            return super.jsonMGet(jsonPath, keys);
        }
        Map keysToIndexes = this.mapKeyToIndex(partitioned);
        Map slots = SlotHash.getSlots(partitioned);
        HashMap<Integer, RedisFuture<List<JsonValue>>> executions = new HashMap<Integer, RedisFuture<List<JsonValue>>>(partitioned.size());
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            Object[] partitionKeys = entry.getValue().toArray(new Object[entry.getValue().size()]);
            RedisFuture<List<JsonValue>> jsonMget = super.jsonMGet(jsonPath, partitionKeys);
            executions.put(entry.getKey(), jsonMget);
        }
        return new PipelinedRedisFuture<List<JsonValue>>(executions, objectPipelinedRedisFuture -> {
            ArrayList<JsonValue> result = new ArrayList<JsonValue>(slots.size());
            for (Object opKey : keys) {
                int slot = (Integer)slots.get(opKey);
                int position = (Integer)((Map)keysToIndexes.get(slot)).get(opKey);
                RedisFuture listRedisFuture = (RedisFuture)executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> (JsonValue)((List)listRedisFuture.get()).get(position)));
            }
            return result;
        });
    }

    private Map<Integer, Map<K, Integer>> mapKeyToIndex(Map<Integer, List<K>> partitioned) {
        HashMap<Integer, Map<Integer, Integer>> result = new HashMap<Integer, Map<Integer, Integer>>(partitioned.size());
        for (Integer partition : partitioned.keySet()) {
            List<K> keysForPartition = partitioned.get(partition);
            HashMap<K, Integer> keysToIndexes = new HashMap<K, Integer>(keysForPartition.size());
            for (int i = 0; i < keysForPartition.size(); ++i) {
                keysToIndexes.put(keysForPartition.get(i), i);
            }
            result.put(partition, keysToIndexes);
        }
        return result;
    }

    @Override
    public RedisFuture<List<KeyValue<K, V>>> mget(K ... keys) {
        return this.mget((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.mget(keys);
        }
        Map partitionedKeysToIndexes = this.mapKeyToIndex(partitioned);
        Map slots = SlotHash.getSlots(partitioned);
        HashMap executions = new HashMap(partitioned.size());
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture mget = super.mget((Iterable)entry.getValue());
            executions.put(entry.getKey(), mget);
        }
        return new PipelinedRedisFuture<List<KeyValue<K, V>>>(executions, objectPipelinedRedisFuture -> {
            ArrayList<KeyValue> result = new ArrayList<KeyValue>(slots.size());
            for (Object opKey : keys) {
                int slot = (Integer)slots.get(opKey);
                int position = (Integer)((Map)partitionedKeysToIndexes.get(slot)).get(opKey);
                RedisFuture listRedisFuture = (RedisFuture)executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> (KeyValue)((List)listRedisFuture.get()).get(position)));
            }
            return result;
        });
    }

    @Override
    public RedisFuture<Long> mget(KeyValueStreamingChannel<K, V> channel, K ... keys) {
        return this.mget(channel, (Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> mget(KeyValueStreamingChannel<K, V> channel, Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.mget(channel, keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> del = super.mget(channel, (Iterable)entry.getValue());
            executions.put(entry.getKey(), del);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments) {
        List keys = arguments.stream().map(JsonMsetArgs::getKey).collect(Collectors.toList());
        Map<Object, List<JsonMsetArgs>> argsPerKey = arguments.stream().collect(Collectors.groupingBy(JsonMsetArgs::getKey));
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.jsonMSet(arguments);
        }
        HashMap<Integer, RedisFuture<String>> executions = new HashMap<Integer, RedisFuture<String>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            ArrayList op = new ArrayList();
            entry.getValue().forEach(k -> op.addAll((Collection)argsPerKey.get(k)));
            RedisFuture<String> mset = super.jsonMSet(op);
            executions.put(entry.getKey(), mset);
        }
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<String> mset(Map<K, V> map) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, map.keySet());
        if (partitioned.size() < 2) {
            return super.mset(map);
        }
        HashMap<Integer, RedisFuture<String>> executions = new HashMap<Integer, RedisFuture<String>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            HashMap op = new HashMap();
            entry.getValue().forEach(k -> op.put(k, map.get(k)));
            RedisFuture<String> mset = super.mset(op);
            executions.put(entry.getKey(), mset);
        }
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<Boolean> msetnx(Map<K, V> map) {
        return this.executePartitionedBoolean(map, x$0 -> super.msetnx(x$0));
    }

    @Override
    public RedisFuture<Boolean> msetex(Map<K, V> map, MSetExArgs args) {
        return this.executePartitionedBoolean(map, op -> super.msetex(op, args));
    }

    private RedisFuture<Boolean> executePartitionedBoolean(Map<K, V> map, Function<Map<K, V>, RedisFuture<Boolean>> operation) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, map.keySet());
        if (partitioned.size() < 2) {
            return operation.apply(map);
        }
        HashMap<Integer, RedisFuture<Boolean>> executions = new HashMap<Integer, RedisFuture<Boolean>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            HashMap op = new HashMap();
            entry.getValue().forEach(k -> op.put(k, map.get(k)));
            RedisFuture<Boolean> future = operation.apply(op);
            executions.put(entry.getKey(), future);
        }
        return new PipelinedRedisFuture<Boolean>(executions, objectPipelinedRedisFuture -> {
            for (RedisFuture f : executions.values()) {
                Boolean b = MultiNodeExecution.execute(f::get);
                if (b != null && b.booleanValue()) continue;
                return false;
            }
            return !executions.isEmpty();
        });
    }

    @Override
    public RedisFuture<K> randomkey() {
        Partitions partitions = this.getStatefulConnection().getPartitions();
        if (partitions.isEmpty()) {
            return super.randomkey();
        }
        int index = ThreadLocalRandom.current().nextInt(partitions.size());
        RedisClusterNode partition = partitions.getPartition(index);
        CompletionStage future = this.getConnectionAsync(partition.getUri().getHost(), partition.getUri().getPort()).thenCompose(RedisKeyAsyncCommands::randomkey);
        return new PipelinedRedisFuture(future);
    }

    @Override
    public RedisFuture<String> scriptFlush() {
        Map executions = this.executeOnNodes(RedisScriptingAsyncCommands::scriptFlush, redisClusterNode -> true);
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<String> scriptKill() {
        Map executions = this.executeOnNodes(RedisScriptingAsyncCommands::scriptKill, redisClusterNode -> true);
        return MultiNodeExecution.alwaysOkOfAsync(executions);
    }

    @Override
    public RedisFuture<String> scriptLoad(byte[] script) {
        Map executions = this.executeOnNodes(cmd -> cmd.scriptLoad(script), redisClusterNode -> true);
        return MultiNodeExecution.lastOfAsync(executions);
    }

    @Override
    public void shutdown(boolean save) {
        this.executeOnNodes(commands -> {
            commands.shutdown(save);
            Command command = new Command(CommandType.SHUTDOWN, new IntegerOutput<K, V>(this.codec), null);
            AsyncCommand async = new AsyncCommand(command);
            async.complete();
            return async;
        }, redisClusterNode -> true);
    }

    @Override
    public RedisFuture<Long> touch(K ... keys) {
        return this.touch((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> touch(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.touch(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> touch = super.touch((Iterable)entry.getValue());
            executions.put(entry.getKey(), touch);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<Long> unlink(K ... keys) {
        return this.unlink((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> unlink(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.unlink(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> unlink = super.unlink((Iterable)entry.getValue());
            executions.put(entry.getKey(), unlink);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisClusterAsyncCommands<K, V> getConnection(String nodeId) {
        return this.getStatefulConnection().getConnection(nodeId).async();
    }

    @Override
    public RedisClusterAsyncCommands<K, V> getConnection(String host, int port) {
        return this.getStatefulConnection().getConnection(host, port).async();
    }

    private CompletableFuture<StatefulRedisConnection<K, V>> getStatefulConnection(String nodeId) {
        return this.getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, nodeId);
    }

    private CompletableFuture<StatefulRedisConnection<K, V>> getStatefulConnection(String host, int port) {
        return this.getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, host, port);
    }

    private CompletableFuture<RedisClusterAsyncCommands<K, V>> getConnectionAsync(String host, int port) {
        return this.getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, host, port).thenApply(StatefulRedisConnection::async);
    }

    @Override
    public StatefulRedisClusterConnection<K, V> getStatefulConnection() {
        return (StatefulRedisClusterConnection)super.getConnection();
    }

    private CompletableFuture<StatefulRedisConnection<K, V>> getRandomStatefulConnection(ConnectionIntent intent) {
        return this.getConnectionProvider().getRandomConnectionAsync(intent);
    }

    @Override
    public AsyncNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate) {
        return this.nodes(predicate, false);
    }

    @Override
    public AsyncNodeSelection<K, V> readonly(Predicate<RedisClusterNode> predicate) {
        return this.nodes(predicate, ConnectionIntent.READ, false);
    }

    @Override
    public AsyncNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate, boolean dynamic) {
        return this.nodes(predicate, ConnectionIntent.WRITE, dynamic);
    }

    protected AsyncNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate, ConnectionIntent connectionIntent, boolean dynamic) {
        StatefulRedisClusterConnectionImpl impl = (StatefulRedisClusterConnectionImpl)this.getConnection();
        AbstractNodeSelection selection = dynamic ? new DynamicNodeSelection(impl.getClusterDistributionChannelWriter(), predicate, connectionIntent, StatefulRedisConnection::async) : new StaticNodeSelection(impl.getClusterDistributionChannelWriter(), predicate, connectionIntent, StatefulRedisConnection::async);
        NodeSelectionInvocationHandler h = new NodeSelectionInvocationHandler((AbstractNodeSelection)selection, RedisClusterAsyncCommands.class, NodeSelectionInvocationHandler.ExecutionModel.ASYNC);
        return (AsyncNodeSelection)Proxy.newProxyInstance(NodeSelectionSupport.class.getClassLoader(), new Class[]{NodeSelectionAsyncCommands.class, AsyncNodeSelection.class}, (InvocationHandler)h);
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan() {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(), ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan(ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(scanArgs), ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan(ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan((ScanCursor)cursor, scanArgs), ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan(ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, RedisKeyAsyncCommands::scan, ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel, scanArgs), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor, scanArgs), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    @Override
    public RedisFuture<AggregationReply<K, V>> ftAggregate(String index, V query, AggregateArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftAggregate(index, query, args), (String nodeId, RedisClusterAsyncCommands<K, V> conn) -> conn.ftAggregate(index, query, args).thenApply(reply -> {
            if (reply != null) {
                reply.getCursor().filter(c -> c.getCursorId() > 0L).ifPresent(c -> c.setNodeId((String)nodeId));
            }
            return reply;
        }), (ProtocolKeyword)CommandType.FT_AGGREGATE);
    }

    @Override
    public RedisFuture<AggregationReply<K, V>> ftAggregate(String index, V query) {
        return this.ftAggregate(index, query, null);
    }

    @Override
    public RedisFuture<SearchReply<K, V>> ftSearch(String index, V query, SearchArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftSearch(index, query, args), (RedisAsyncCommands<K, V> conn) -> conn.ftSearch(index, query, args), (ProtocolKeyword)CommandType.FT_SEARCH);
    }

    @Override
    public RedisFuture<SearchReply<K, V>> ftSearch(String index, V query) {
        return this.ftSearch(index, query, SearchArgs.builder().build());
    }

    @Override
    public RedisFuture<String> ftExplain(String index, V query) {
        return this.routeKeyless(() -> super.ftExplain(index, query), (RedisAsyncCommands<K, V> conn) -> conn.ftExplain(index, query), (ProtocolKeyword)CommandType.FT_EXPLAIN);
    }

    @Override
    public RedisFuture<String> ftExplain(String index, V query, ExplainArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftExplain(index, query, args), (RedisAsyncCommands<K, V> conn) -> conn.ftExplain(index, query, args), (ProtocolKeyword)CommandType.FT_EXPLAIN);
    }

    @Override
    public RedisFuture<List<V>> ftTagvals(String index, String fieldName) {
        return this.routeKeyless(() -> super.ftTagvals(index, fieldName), (RedisAsyncCommands<K, V> conn) -> conn.ftTagvals(index, fieldName), (ProtocolKeyword)CommandType.FT_TAGVALS);
    }

    @Override
    public RedisFuture<SpellCheckResult<V>> ftSpellcheck(String index, V query) {
        return this.routeKeyless(() -> super.ftSpellcheck(index, query), (RedisAsyncCommands<K, V> conn) -> conn.ftSpellcheck(index, query), (ProtocolKeyword)CommandType.FT_SPELLCHECK);
    }

    @Override
    public RedisFuture<SpellCheckResult<V>> ftSpellcheck(String index, V query, SpellCheckArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftSpellcheck(index, query, args), (RedisAsyncCommands<K, V> conn) -> conn.ftSpellcheck(index, query, args), (ProtocolKeyword)CommandType.FT_SPELLCHECK);
    }

    @Override
    public RedisFuture<Long> ftDictadd(String dict, V ... terms) {
        return this.routeKeyless(() -> super.ftDictadd(dict, terms), (RedisAsyncCommands<K, V> conn) -> conn.ftDictadd(dict, terms), (ProtocolKeyword)CommandType.FT_DICTADD);
    }

    @Override
    public RedisFuture<Long> ftDictdel(String dict, V ... terms) {
        return this.routeKeyless(() -> super.ftDictdel(dict, terms), (RedisAsyncCommands<K, V> conn) -> conn.ftDictdel(dict, terms), (ProtocolKeyword)CommandType.FT_DICTDEL);
    }

    @Override
    public RedisFuture<List<V>> ftDictdump(String dict) {
        return this.routeKeyless(() -> super.ftDictdump(dict), (RedisAsyncCommands<K, V> conn) -> conn.ftDictdump(dict), (ProtocolKeyword)CommandType.FT_DICTDUMP);
    }

    @Override
    public RedisFuture<String> ftAliasadd(String alias, String index) {
        return this.routeKeyless(() -> super.ftAliasadd(alias, index), (RedisAsyncCommands<K, V> conn) -> conn.ftAliasadd(alias, index), (ProtocolKeyword)CommandType.FT_ALIASADD);
    }

    @Override
    public RedisFuture<String> ftAliasupdate(String alias, String index) {
        return this.routeKeyless(() -> super.ftAliasupdate(alias, index), (RedisAsyncCommands<K, V> conn) -> conn.ftAliasupdate(alias, index), (ProtocolKeyword)CommandType.FT_ALIASUPDATE);
    }

    @Override
    public RedisFuture<String> ftAliasdel(String alias) {
        return this.routeKeyless(() -> super.ftAliasdel(alias), (RedisAsyncCommands<K, V> conn) -> conn.ftAliasdel(alias), (ProtocolKeyword)CommandType.FT_ALIASDEL);
    }

    @Override
    public RedisFuture<List<V>> ftList() {
        return this.routeKeyless(() -> super.ftList(), (RedisAsyncCommands<K, V> conn) -> conn.ftList(), (ProtocolKeyword)CommandType.FT_LIST);
    }

    @Override
    public RedisFuture<String> ftCreate(String index, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftCreate(index, fieldArgs), (RedisAsyncCommands<K, V> conn) -> conn.ftCreate(index, fieldArgs), (ProtocolKeyword)CommandType.FT_CREATE);
    }

    @Override
    public RedisFuture<String> ftCreate(String index, CreateArgs<K, V> arguments, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftCreate(index, arguments, fieldArgs), (RedisAsyncCommands<K, V> conn) -> conn.ftCreate(index, arguments, fieldArgs), (ProtocolKeyword)CommandType.FT_CREATE);
    }

    @Override
    public RedisFuture<String> ftAlter(String index, boolean skipInitialScan, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftAlter(index, skipInitialScan, fieldArgs), (RedisAsyncCommands<K, V> conn) -> conn.ftAlter(index, skipInitialScan, fieldArgs), (ProtocolKeyword)CommandType.FT_ALTER);
    }

    @Override
    public RedisFuture<String> ftAlter(String index, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftAlter(index, fieldArgs), (RedisAsyncCommands<K, V> conn) -> conn.ftAlter(index, fieldArgs), (ProtocolKeyword)CommandType.FT_ALTER);
    }

    @Override
    public RedisFuture<String> ftDropindex(String index, boolean deleteDocumentKeys) {
        return this.routeKeyless(() -> super.ftDropindex(index, deleteDocumentKeys), (RedisAsyncCommands<K, V> conn) -> conn.ftDropindex(index, deleteDocumentKeys), (ProtocolKeyword)CommandType.FT_DROPINDEX);
    }

    @Override
    public RedisFuture<String> ftDropindex(String index) {
        return this.routeKeyless(() -> super.ftDropindex(index), (RedisAsyncCommands<K, V> conn) -> conn.ftDropindex(index), (ProtocolKeyword)CommandType.FT_DROPINDEX);
    }

    @Override
    public RedisFuture<Map<V, List<V>>> ftSyndump(String index) {
        return this.routeKeyless(() -> super.ftSyndump(index), (RedisAsyncCommands<K, V> conn) -> conn.ftSyndump(index), (ProtocolKeyword)CommandType.FT_SYNDUMP);
    }

    @Override
    public RedisFuture<String> ftSynupdate(String index, V synonymGroupId, V ... terms) {
        return this.routeKeyless(() -> super.ftSynupdate(index, synonymGroupId, terms), (RedisAsyncCommands<K, V> conn) -> conn.ftSynupdate(index, synonymGroupId, terms), (ProtocolKeyword)CommandType.FT_SYNUPDATE);
    }

    @Override
    public RedisFuture<String> ftSynupdate(String index, V synonymGroupId, SynUpdateArgs<K, V> args, V ... terms) {
        return this.routeKeyless(() -> super.ftSynupdate(index, synonymGroupId, args, terms), (RedisAsyncCommands<K, V> conn) -> conn.ftSynupdate(index, synonymGroupId, args, terms), (ProtocolKeyword)CommandType.FT_SYNUPDATE);
    }

    @Override
    public RedisFuture<AggregationReply<K, V>> ftCursorread(String index, AggregationReply.Cursor cursor, int count) {
        if (cursor == null) {
            CompletableFuture failed = new CompletableFuture();
            failed.completeExceptionally(new IllegalArgumentException("cursor must not be null"));
            return new PipelinedRedisFuture<AggregationReply<K, V>>(failed);
        }
        long cursorId = cursor.getCursorId();
        if (cursorId <= 0L) {
            return new PipelinedRedisFuture(CompletableFuture.completedFuture(new AggregationReply()));
        }
        Optional<String> nodeIdOpt = cursor.getNodeId();
        if (!nodeIdOpt.isPresent()) {
            CompletableFuture failed = new CompletableFuture();
            failed.completeExceptionally(new IllegalArgumentException("Cursor missing nodeId; cannot route cursor READ in cluster mode"));
            return new PipelinedRedisFuture<AggregationReply<K, V>>(failed);
        }
        String nodeId = nodeIdOpt.get();
        StatefulRedisConnection<K, V> byNode = this.getStatefulConnection().getConnection(nodeId, ConnectionIntent.READ);
        RedisFuture f = byNode.async().ftCursorread(index, cursor, count);
        CompletableFuture mapped = new CompletableFuture();
        f.whenComplete((reply, err) -> {
            if (err != null) {
                mapped.completeExceptionally((Throwable)err);
                return;
            }
            if (reply != null) {
                reply.getCursor().ifPresent(c -> c.setNodeId(nodeId));
            }
            mapped.complete(reply);
        });
        return new PipelinedRedisFuture<AggregationReply<K, V>>(mapped);
    }

    @Override
    public RedisFuture<AggregationReply<K, V>> ftCursorread(String index, AggregationReply.Cursor cursor) {
        return this.ftCursorread(index, cursor, -1);
    }

    @Override
    public RedisFuture<String> ftCursordel(String index, AggregationReply.Cursor cursor) {
        if (cursor == null) {
            CompletableFuture failed = new CompletableFuture();
            failed.completeExceptionally(new IllegalArgumentException("cursor must not be null"));
            return new PipelinedRedisFuture<String>(failed);
        }
        long cursorId = cursor.getCursorId();
        if (cursorId <= 0L) {
            return new PipelinedRedisFuture<String>(CompletableFuture.completedFuture("OK"));
        }
        Optional<String> nodeIdOpt = cursor.getNodeId();
        if (!nodeIdOpt.isPresent()) {
            CompletableFuture failed = new CompletableFuture();
            failed.completeExceptionally(new IllegalArgumentException("Cursor missing nodeId; cannot route cursor DEL in cluster mode"));
            return new PipelinedRedisFuture<String>(failed);
        }
        String nodeId = nodeIdOpt.get();
        StatefulRedisConnection<K, V> byNode = this.getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
        return byNode.async().ftCursordel(index, cursor);
    }

    <R> RedisFuture<R> routeKeyless(Supplier<RedisFuture<R>> superCall, Function<RedisAsyncCommands<K, V>, CompletionStage<R>> routedCall, ProtocolKeyword commandType) {
        ConnectionIntent intent = this.getConnectionIntent(commandType);
        CompletionStage future = ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.getRandomStatefulConnection(intent).thenApply(StatefulRedisConnection::async)).thenCompose(routedCall)).handle((res, err) -> {
            if (err != null) {
                logger.error("Cluster routing failed for {} - falling back to superCall", (Object)commandType, err);
                return ((RedisFuture)superCall.get()).toCompletableFuture();
            }
            return CompletableFuture.completedFuture(res);
        })).thenCompose(Function.identity());
        return new PipelinedRedisFuture(future);
    }

    <R> RedisFuture<R> routeKeyless(Supplier<RedisFuture<R>> superCall, BiFunction<String, RedisClusterAsyncCommands<K, V>, CompletionStage<R>> routedCall, ProtocolKeyword commandType) {
        ConnectionIntent intent = this.getConnectionIntent(commandType);
        CompletionStage future = ((CompletableFuture)((CompletableFuture)this.getRandomStatefulConnection(intent).thenCompose(conn -> {
            RedisAsyncCommands async = conn.async();
            return async.clusterMyId().toCompletableFuture().thenCompose(nodeId -> (CompletionStage)routedCall.apply((String)nodeId, async));
        })).handle((res, err) -> {
            if (err != null) {
                logger.error("Cluster routing failed for {} - falling back to superCall", (Object)commandType, err);
                return ((RedisFuture)superCall.get()).toCompletableFuture();
            }
            return CompletableFuture.completedFuture(res);
        })).thenCompose(Function.identity());
        return new PipelinedRedisFuture(future);
    }

    private ConnectionIntent getConnectionIntent(ProtocolKeyword commandType) {
        try {
            Command probe = new Command(commandType, null);
            boolean isReadOnly = this.getStatefulConnection().getOptions().getReadOnlyCommands().isReadOnly(probe);
            return isReadOnly ? ConnectionIntent.READ : ConnectionIntent.WRITE;
        }
        catch (Exception e) {
            logger.error("Error while determining connection intent for " + commandType, (Throwable)e);
            return ConnectionIntent.WRITE;
        }
    }

    private <T extends ScanCursor> RedisFuture<T> clusterScan(ScanCursor cursor, BiFunction<RedisKeyAsyncCommands<K, V>, ScanCursor, RedisFuture<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<RedisFuture<T>> resultMapper) {
        return RedisAdvancedClusterAsyncCommandsImpl.clusterScan(this.getStatefulConnection(), cursor, scanFunction, resultMapper);
    }

    protected <T> Map<String, CompletableFuture<T>> executeOnUpstream(Function<RedisClusterAsyncCommands<K, V>, RedisFuture<T>> function) {
        return this.executeOnNodes(function, redisClusterNode -> redisClusterNode.is(RedisClusterNode.NodeFlag.UPSTREAM));
    }

    protected <T> Map<String, CompletableFuture<T>> executeOnNodes(Function<RedisClusterAsyncCommands<K, V>, RedisFuture<T>> function, Function<RedisClusterNode, Boolean> filter) {
        HashMap<String, CompletableFuture<T>> executions = new HashMap<String, CompletableFuture<T>>();
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            if (!filter.apply(redisClusterNode).booleanValue()) continue;
            RedisURI uri = redisClusterNode.getUri();
            CompletableFuture<RedisClusterAsyncCommands<K, V>> connection = this.getConnectionAsync(uri.getHost(), uri.getPort());
            executions.put(redisClusterNode.getNodeId(), (CompletableFuture<T>)connection.thenCompose(function::apply));
        }
        return executions;
    }

    private RedisClusterAsyncCommands<K, V> findConnectionBySlot(int slot) {
        RedisClusterNode node = this.getStatefulConnection().getPartitions().getPartitionBySlot(slot);
        if (node != null) {
            return this.getConnection(node.getUri().getHost(), node.getUri().getPort());
        }
        return null;
    }

    private AsyncClusterConnectionProvider getConnectionProvider() {
        ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter)this.getStatefulConnection().getChannelWriter();
        return (AsyncClusterConnectionProvider)((Object)writer.getClusterConnectionProvider());
    }

    static <T extends ScanCursor, K, V> RedisFuture<T> clusterScan(StatefulRedisClusterConnection<K, V> connection, ScanCursor cursor, BiFunction<RedisKeyAsyncCommands<K, V>, ScanCursor, RedisFuture<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<RedisFuture<T>> mapper) {
        List<String> nodeIds = ClusterScanSupport.getNodeIds(connection, cursor);
        String currentNodeId = ClusterScanSupport.getCurrentNodeId(cursor, nodeIds);
        ScanCursor continuationCursor = ClusterScanSupport.getContinuationCursor(cursor);
        CompletionStage stage = connection.getConnectionAsync(currentNodeId).thenCompose(conn -> ((RedisFuture)scanFunction.apply(conn.async(), continuationCursor)).toCompletableFuture());
        PipelinedRedisFuture scanCursor = new PipelinedRedisFuture(stage);
        return mapper.map(nodeIds, currentNodeId, scanCursor);
    }
}

