/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.failover;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisAsyncCommandsImpl;
import io.lettuce.core.RedisConnectionStateListener;
import io.lettuce.core.RedisReactiveCommandsImpl;
import io.lettuce.core.RedisURI;
import io.lettuce.core.annotations.Experimental;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.push.PushListener;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.failover.CircuitBreaker;
import io.lettuce.core.failover.CircuitBreakerStateChangeEvent;
import io.lettuce.core.failover.DatabaseConfig;
import io.lettuce.core.failover.DatabaseConnectionFactory;
import io.lettuce.core.failover.MultiDbFutureSyncInvocationHandler;
import io.lettuce.core.failover.RedisDatabase;
import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection;
import io.lettuce.core.failover.health.HealthCheck;
import io.lettuce.core.failover.health.HealthStatus;
import io.lettuce.core.failover.health.HealthStatusChangeEvent;
import io.lettuce.core.failover.health.HealthStatusManager;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.json.JsonParser;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.function.Supplier;

@Experimental
public class StatefulRedisMultiDbConnectionImpl<C extends StatefulRedisConnection<K, V>, K, V>
implements StatefulRedisMultiDbConnection<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(StatefulRedisMultiDbConnectionImpl.class);
    protected final Map<RedisURI, RedisDatabase<C>> databases;
    protected final HealthStatusManager healthStatusManager;
    protected RedisDatabase<C> current;
    protected final RedisCommands<K, V> sync;
    protected final RedisAsyncCommandsImpl<K, V> async;
    protected final RedisReactiveCommandsImpl<K, V> reactive;
    protected final RedisCodec<K, V> codec;
    protected final Supplier<JsonParser> parser;
    protected final Set<PushListener> pushListeners = ConcurrentHashMap.newKeySet();
    protected final Set<RedisConnectionStateListener> connectionStateListeners = ConcurrentHashMap.newKeySet();
    protected final DatabaseConnectionFactory<C, K, V> connectionFactory;
    private final ReadWriteLock multiDbLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.multiDbLock.readLock();
    private final Lock writeLock = this.multiDbLock.writeLock();

    public StatefulRedisMultiDbConnectionImpl(Map<RedisURI, RedisDatabase<C>> connections, ClientResources resources, RedisCodec<K, V> codec, Supplier<JsonParser> parser, DatabaseConnectionFactory<C, K, V> connectionFactory, HealthStatusManager healthStatusManager) {
        if (connections == null || connections.isEmpty()) {
            throw new IllegalArgumentException("connections must not be empty");
        }
        LettuceAssert.notNull((Object)healthStatusManager, "healthStatusManager must not be null");
        this.databases = new ConcurrentHashMap<RedisURI, RedisDatabase<C>>(connections);
        this.codec = codec;
        this.parser = parser;
        this.connectionFactory = connectionFactory;
        this.healthStatusManager = healthStatusManager;
        this.databases.values().forEach(db -> db.getCircuitBreaker().addListener(this::onCircuitBreakerStateChange));
        this.databases.values().forEach(db -> healthStatusManager.registerListener(db.getRedisURI(), this::onHealthStatusChange));
        this.current = this.getNextHealthyDatabase(null);
        this.async = this.newRedisAsyncCommandsImpl();
        this.sync = this.newRedisSyncCommandsImpl();
        this.reactive = this.newRedisReactiveCommandsImpl();
    }

    private void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event) {
        if (event.getCircuitBreaker() == this.current.getCircuitBreaker() && event.getNewState() == CircuitBreaker.State.OPEN) {
            this.failoverFrom(this.current);
        }
    }

    private void onHealthStatusChange(HealthStatusChangeEvent event) {
        logger.debug("Health status changed for {} from {} to {}", new Object[]{event.getEndpoint(), event.getOldStatus(), event.getNewStatus()});
        RedisDatabase<C> database = this.databases.get(event.getEndpoint());
        if (database == null) {
            return;
        }
        if (this.isCurrent(database) && event.getNewStatus() == HealthStatus.UNHEALTHY) {
            this.failoverFrom(database);
        }
    }

    private boolean isCurrent(RedisDatabase<C> database) {
        return database == this.current;
    }

    private void failoverFrom(RedisDatabase<C> fromDb) {
        RedisDatabase<C> healthyDatabase = this.getNextHealthyDatabase(fromDb);
        if (healthyDatabase != null) {
            this.switchToDatabase(healthyDatabase.getRedisURI());
        }
    }

    private RedisDatabase<C> getNextHealthyDatabase(RedisDatabase<C> dbToExclude) {
        return this.databases.values().stream().filter(DatabasePredicates.isHealthyAndCbClosed).filter(DatabasePredicates.isNot(dbToExclude)).max(DatabaseComparators.byWeight).orElse(null);
    }

    @Override
    public RedisAsyncCommands<K, V> async() {
        return this.async;
    }

    protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
        return (RedisCommands)this.syncHandler(this.async(), RedisCommands.class, RedisClusterCommands.class);
    }

    protected <T> T syncHandler(Object asyncApi, Class<?> ... interfaces) {
        MultiDbFutureSyncInvocationHandler h = new MultiDbFutureSyncInvocationHandler(this, asyncApi, interfaces);
        return (T)Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, (InvocationHandler)h);
    }

    protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
        return new RedisAsyncCommandsImpl<K, V>(this, this.codec, this.parser);
    }

    @Override
    public RedisReactiveCommands<K, V> reactive() {
        return this.reactive;
    }

    protected RedisReactiveCommandsImpl<K, V> newRedisReactiveCommandsImpl() {
        return new RedisReactiveCommandsImpl<K, V>(this, this.codec, this.parser);
    }

    @Override
    public RedisCommands<K, V> sync() {
        return this.sync;
    }

    @Override
    public void addListener(RedisConnectionStateListener listener) {
        this.doBySharedLock(() -> {
            this.connectionStateListeners.add(listener);
            this.current.getConnection().addListener(listener);
        });
    }

    @Override
    public void removeListener(RedisConnectionStateListener listener) {
        this.doBySharedLock(() -> {
            this.connectionStateListeners.remove(listener);
            this.current.getConnection().removeListener(listener);
        });
    }

    @Override
    public void setTimeout(Duration timeout) {
        this.databases.values().forEach(db -> db.getConnection().setTimeout(timeout));
    }

    @Override
    public Duration getTimeout() {
        return this.current.getConnection().getTimeout();
    }

    @Override
    public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
        return this.current.getConnection().dispatch(command);
    }

    @Override
    public Collection<RedisCommand<K, V, ?>> dispatch(Collection<? extends RedisCommand<K, V, ?>> commands) {
        return this.current.getConnection().dispatch(commands);
    }

    @Override
    public void close() {
        this.healthStatusManager.close();
        this.databases.values().forEach(db -> db.getConnection().close());
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return CompletableFuture.allOf((CompletableFuture[])this.databases.values().stream().map(db -> db.getConnection()).map(StatefulConnection::closeAsync).toArray(CompletableFuture[]::new));
    }

    @Override
    public boolean isOpen() {
        return this.current.getConnection().isOpen();
    }

    @Override
    public ClientOptions getOptions() {
        return this.current.getConnection().getOptions();
    }

    @Override
    public ClientResources getResources() {
        return this.current.getConnection().getResources();
    }

    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        this.databases.values().forEach(db -> db.getConnection().setAutoFlushCommands(autoFlush));
    }

    @Override
    public void flushCommands() {
        this.current.getConnection().flushCommands();
    }

    @Override
    public boolean isMulti() {
        return this.current.getConnection().isMulti();
    }

    @Override
    public void addListener(PushListener listener) {
        this.doBySharedLock(() -> {
            this.pushListeners.add(listener);
            this.current.getConnection().addListener(listener);
        });
    }

    @Override
    public void removeListener(PushListener listener) {
        this.doBySharedLock(() -> {
            this.pushListeners.remove(listener);
            this.current.getConnection().removeListener(listener);
        });
    }

    @Override
    public RedisCodec<K, V> getCodec() {
        return this.codec;
    }

    @Override
    public RedisURI getCurrentEndpoint() {
        return this.current.getRedisURI();
    }

    @Override
    public Iterable<RedisURI> getEndpoints() {
        return this.databases.keySet();
    }

    @Override
    public void switchToDatabase(RedisURI redisURI) {
        this.doByExclusiveLock(() -> {
            RedisDatabase fromDb = this.current;
            RedisDatabase toDb = this.databases.get(redisURI);
            if (fromDb == null || toDb == null) {
                throw new UnsupportedOperationException("Unable to switch between endpoints - the driver was not able to locate the source or destination endpoint.");
            }
            if (fromDb.equals(toDb)) {
                return;
            }
            this.current = toDb;
            this.connectionStateListeners.forEach(listener -> {
                toDb.getConnection().addListener((RedisConnectionStateListener)listener);
                fromDb.getConnection().removeListener((RedisConnectionStateListener)listener);
            });
            this.pushListeners.forEach(listener -> {
                toDb.getConnection().addListener((PushListener)listener);
                fromDb.getConnection().removeListener((PushListener)listener);
            });
            fromDb.getDatabaseEndpoint().handOverCommandQueue(toDb.getDatabaseEndpoint());
        });
    }

    protected void doBySharedLock(Runnable operation) {
        this.readLock.lock();
        try {
            operation.run();
        }
        finally {
            this.readLock.unlock();
        }
    }

    protected void doByExclusiveLock(Runnable operation) {
        this.writeLock.lock();
        try {
            operation.run();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public CircuitBreaker getCircuitBreaker(RedisURI endpoint) {
        RedisDatabase<C> database = this.databases.get(endpoint);
        if (database == null) {
            throw new IllegalArgumentException("Unknown endpoint: " + endpoint);
        }
        return database.getCircuitBreaker();
    }

    @Override
    public boolean isHealthy(RedisURI endpoint) {
        RedisDatabase<C> database = this.databases.get(endpoint);
        if (database == null) {
            throw new IllegalArgumentException("Unknown endpoint: " + endpoint);
        }
        return DatabasePredicates.isHealthyAndCbClosed.test(database);
    }

    @Override
    public void addDatabase(RedisURI redisURI, float weight) {
        this.addDatabase(DatabaseConfig.builder(redisURI).weight(weight).build());
    }

    @Override
    public void addDatabase(DatabaseConfig databaseConfig) {
        if (databaseConfig == null) {
            throw new IllegalArgumentException("DatabaseConfig must not be null");
        }
        if (this.connectionFactory == null) {
            throw new UnsupportedOperationException("Adding databases dynamically is not supported. Connection was created without a DatabaseConnectionFactory.");
        }
        RedisURI redisURI = databaseConfig.getRedisURI();
        this.doByExclusiveLock(() -> {
            if (this.databases.containsKey(redisURI)) {
                throw new IllegalArgumentException("Database already exists: " + redisURI);
            }
            this.healthStatusManager.registerListener(redisURI, this::onHealthStatusChange);
            RedisDatabase<C> database = this.connectionFactory.createDatabase(databaseConfig, this.codec, this.healthStatusManager);
            this.databases.put(redisURI, database);
            database.getCircuitBreaker().addListener(this::onCircuitBreakerStateChange);
        });
    }

    @Override
    public void removeDatabase(RedisURI redisURI) {
        if (redisURI == null) {
            throw new IllegalArgumentException("RedisURI must not be null");
        }
        this.doByExclusiveLock(() -> {
            RedisDatabase<C> database = null;
            database = this.databases.get(redisURI);
            if (database == null) {
                throw new IllegalArgumentException("Database not found: " + redisURI);
            }
            if (this.current.getRedisURI().equals(redisURI)) {
                throw new UnsupportedOperationException("Cannot remove the currently active database: " + redisURI);
            }
            this.healthStatusManager.unregisterListener(redisURI, this::onHealthStatusChange);
            this.healthStatusManager.remove(redisURI);
            this.databases.remove(redisURI);
            database.close();
        });
    }

    static class DatabasePredicates {
        public static final Predicate<RedisDatabase<?>> isHealthCheckHealthy = db -> {
            HealthCheck healthCheck = db.getHealthCheck();
            if (healthCheck == null) {
                return true;
            }
            return healthCheck.getStatus() == HealthStatus.HEALTHY;
        };
        public static final Predicate<RedisDatabase<?>> isCbClosed = db -> db.getCircuitBreaker().getCurrentState() == CircuitBreaker.State.CLOSED;
        public static final Predicate<RedisDatabase<?>> isHealthyAndCbClosed = isHealthCheckHealthy.and(isCbClosed);

        DatabasePredicates() {
        }

        public static Predicate<RedisDatabase<?>> isNot(RedisDatabase<?> dbInstance) {
            return db -> !db.equals(dbInstance);
        }
    }

    static class DatabaseComparators {
        public static final Comparator<RedisDatabase<?>> byWeight = Comparator.comparingDouble(RedisDatabase::getWeight);

        DatabaseComparators() {
        }
    }
}

