/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.protocol;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.api.push.PushListener;
import io.lettuce.core.api.push.PushMessage;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.protocol.ChannelLogDescriptor;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.ConnectionFacade;
import io.lettuce.core.protocol.ConnectionWatchdog;
import io.lettuce.core.protocol.Endpoint;
import io.lettuce.core.protocol.MaintenanceAwareComponent;
import io.lettuce.core.protocol.RebindState;
import io.lettuce.core.protocol.ReconnectionListener;
import io.lettuce.core.resource.Delay;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.util.AttributeKey;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Mono;

@ChannelHandler.Sharable
public class MaintenanceAwareConnectionWatchdog
extends ConnectionWatchdog
implements PushListener {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MaintenanceAwareConnectionWatchdog.class);
    private static final String REBIND_MESSAGE_TYPE = "MOVING";
    private static final String MIGRATING_MESSAGE_TYPE = "MIGRATING";
    private static final String MIGRATED_MESSAGE_TYPE = "MIGRATED";
    private static final String FAILING_OVER_MESSAGE_TYPE = "FAILING_OVER";
    private static final String FAILED_OVER_MESSAGE_TYPE = "FAILED_OVER";
    public static final AttributeKey<RebindState> REBIND_ATTRIBUTE = AttributeKey.newInstance((String)"rebindAddress");
    private static final int MIGRATING_SHARDS_INDEX = 3;
    private static final int MIGRATED_SHARDS_INDEX = 2;
    private static final int FAILING_OVER_SHARDS_INDEX = 3;
    private static final int FAILED_OVER_SHARDS_INDEX = 2;
    private Channel channel;
    private final Set<MaintenanceAwareComponent> componentListeners = new HashSet<MaintenanceAwareComponent>();
    private RebindAwareAddressSupplier rebindAwareAddressSupplier;

    public MaintenanceAwareConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer, EventExecutorGroup reconnectWorkers, Mono<SocketAddress> socketAddressSupplier, ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus, Endpoint endpoint) {
        super(reconnectDelay, clientOptions, bootstrap, timer, reconnectWorkers, socketAddressSupplier, reconnectionListener, connectionFacade, eventBus, endpoint);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.channel = ctx.channel();
        ChannelPipeline pipeline = ctx.channel().pipeline();
        CommandHandler commandHandler = (CommandHandler)pipeline.get(CommandHandler.class);
        if (!commandHandler.getEndpoint().getPushListeners().contains(this)) {
            commandHandler.getEndpoint().addListener(this);
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel() != null && ctx.channel().isActive() && ctx.channel().hasAttr(REBIND_ATTRIBUTE) && ctx.channel().attr(REBIND_ATTRIBUTE).get() == RebindState.COMPLETED) {
            logger.debug("[{}]  Disconnecting at {}", (Object)ChannelLogDescriptor.logDescriptor(this.channel), (Object)LocalTime.now());
            ctx.channel().close().awaitUninterruptibly();
            this.notifyRebindCompleted();
        }
        super.channelReadComplete(ctx);
    }

    @Override
    protected Mono<SocketAddress> wrapSocketAddressSupplier(Mono<SocketAddress> socketAddressSupplier) {
        Mono<SocketAddress> source = super.wrapSocketAddressSupplier(socketAddressSupplier);
        this.rebindAwareAddressSupplier = new RebindAwareAddressSupplier();
        return this.rebindAwareAddressSupplier.wrappedSupplier(source);
    }

    @Override
    public void onPushMessage(PushMessage message) {
        String mType = message.getType();
        if (REBIND_MESSAGE_TYPE.equals(mType)) {
            logger.debug("Rebind requested");
            MovingEvent movingEvent = MovingEvent.from(message);
            if (movingEvent != null) {
                if (null == movingEvent.getEndpoint()) {
                    logger.debug("[channel={}] Deferred Rebind requested. Rebinding to current endpoint after '{}'", (Object)this.channel.id(), (Object)movingEvent.getTime());
                    this.channel.eventLoop().schedule(() -> this.rebind(movingEvent), movingEvent.getTime().toMillis() / 2L, TimeUnit.MILLISECONDS);
                } else {
                    this.rebind(movingEvent);
                }
            }
        } else if (MIGRATING_MESSAGE_TYPE.equals(mType)) {
            logger.debug("[{}] Shard migration started", (Object)ChannelLogDescriptor.logDescriptor(this.channel));
            this.notifyMigrateStarted(this.getMigratingShards(message));
        } else if (MIGRATED_MESSAGE_TYPE.equals(mType)) {
            logger.debug("[{}] Shard migration completed", (Object)ChannelLogDescriptor.logDescriptor(this.channel));
            this.notifyMigrateCompleted(this.getMigratedShards(message));
        } else if (FAILING_OVER_MESSAGE_TYPE.equals(mType)) {
            logger.debug("[{}] Failover started", (Object)ChannelLogDescriptor.logDescriptor(this.channel));
            this.notifyFailoverStarted(this.getFailingOverShards(message));
        } else if (FAILED_OVER_MESSAGE_TYPE.equals(mType)) {
            logger.debug("[{}] Failover completed", (Object)ChannelLogDescriptor.logDescriptor(this.channel));
            this.notifyFailoverCompleted(this.getFailedOverShards(message));
        }
    }

    private void rebind(MovingEvent movingEvent) {
        logger.debug("[{}] Rebind to '{}'", (Object)ChannelLogDescriptor.logDescriptor(this.channel), (Object)movingEvent.getEndpoint());
        this.channel.attr(REBIND_ATTRIBUTE).set((Object)RebindState.STARTED);
        this.rebindAwareAddressSupplier.rebind(movingEvent.getTime(), movingEvent.getEndpoint());
        ChannelPipeline pipeline = this.channel.pipeline();
        CommandHandler commandHandler = (CommandHandler)pipeline.get(CommandHandler.class);
        if (commandHandler.getStack().isEmpty()) {
            logger.debug("[{}] Closing channel as part of rebind", (Object)ChannelLogDescriptor.logDescriptor(this.channel));
            this.channel.close().awaitUninterruptibly();
            this.channel.attr(REBIND_ATTRIBUTE).set((Object)RebindState.COMPLETED);
        } else {
            this.notifyRebindStarted(movingEvent.getTime(), movingEvent.getEndpoint());
        }
    }

    private String getMigratingShards(PushMessage message) {
        List<Object> content = message.getContent();
        if (MaintenanceAwareConnectionWatchdog.isInvalidMaintenanceEvent(content, 4)) {
            return null;
        }
        return MaintenanceAwareConnectionWatchdog.getShards(content, 3, MIGRATING_MESSAGE_TYPE);
    }

    private String getMigratedShards(PushMessage message) {
        List<Object> content = message.getContent();
        if (MaintenanceAwareConnectionWatchdog.isInvalidMaintenanceEvent(content, 3)) {
            return null;
        }
        return MaintenanceAwareConnectionWatchdog.getShards(content, 2, MIGRATED_MESSAGE_TYPE);
    }

    private String getFailingOverShards(PushMessage message) {
        List<Object> content = message.getContent();
        if (MaintenanceAwareConnectionWatchdog.isInvalidMaintenanceEvent(content, 4)) {
            return null;
        }
        return MaintenanceAwareConnectionWatchdog.getShards(content, 3, FAILING_OVER_MESSAGE_TYPE);
    }

    private String getFailedOverShards(PushMessage message) {
        List<Object> content = message.getContent();
        if (MaintenanceAwareConnectionWatchdog.isInvalidMaintenanceEvent(content, 3)) {
            return null;
        }
        return MaintenanceAwareConnectionWatchdog.getShards(content, 2, FAILED_OVER_MESSAGE_TYPE);
    }

    private static boolean isInvalidMaintenanceEvent(List<Object> content, int expectedSize) {
        if (content.size() < expectedSize) {
            logger.warn("Invalid maintenance message format, expected at least {} elements, got {}", (Object)expectedSize, (Object)content.size());
            return true;
        }
        return false;
    }

    private static String getShards(List<Object> content, int shardsIndex, String maintenanceEvent) {
        Object shardsObject = content.get(shardsIndex);
        if (!(shardsObject instanceof ByteBuffer)) {
            logger.warn("Invalid shards format, expected ByteBuffer, got {} for {} maintenance event", shardsObject != null ? shardsObject.getClass() : "null", (Object)maintenanceEvent);
            return null;
        }
        return StringCodec.UTF8.decodeKey((ByteBuffer)shardsObject);
    }

    public void setMaintenanceEventListener(MaintenanceAwareComponent component) {
        this.componentListeners.add(component);
    }

    private void notifyRebindCompleted() {
        this.componentListeners.forEach(MaintenanceAwareComponent::onRebindCompleted);
    }

    private void notifyRebindStarted(Duration time, SocketAddress endpoint) {
        this.componentListeners.forEach(e -> e.onRebindStarted(time, endpoint));
    }

    private void notifyMigrateStarted(String shards) {
        this.componentListeners.forEach(component -> component.onMigrateStarted(shards));
    }

    private void notifyMigrateCompleted(String shards) {
        this.componentListeners.forEach(component -> component.onMigrateCompleted(shards));
    }

    private void notifyFailoverStarted(String shards) {
        this.componentListeners.forEach(component -> component.onFailoverStarted(shards));
    }

    private void notifyFailoverCompleted(String shards) {
        this.componentListeners.forEach(component -> component.onFailoverCompleted(shards));
    }

    static class RebindAwareAddressSupplier {
        private final AtomicReference<State> state = new AtomicReference();
        private final Clock clock;

        public RebindAwareAddressSupplier() {
            this(Clock.systemUTC());
        }

        public RebindAwareAddressSupplier(Clock clock) {
            this.clock = clock;
        }

        public void rebind(Duration duration, SocketAddress rebindAddress) {
            Instant newCutoff = this.clock.instant().plus(duration);
            this.state.set(new State(newCutoff, rebindAddress));
        }

        public Mono<SocketAddress> wrappedSupplier(Mono<SocketAddress> original) {
            return Mono.defer(() -> {
                State current = this.state.get();
                logger.debug("RebindAwareAddressSupplier rebind state: {}", (Object)this.state.get());
                if (current != null && current.rebindAddress != null && this.clock.instant().isBefore(current.cutoff)) {
                    logger.debug("RebindAwareAddressSupplier using rebind address: {}", (Object)this.state.get());
                    return Mono.just((Object)current.rebindAddress).doOnSubscribe(s -> logger.debug("RebindAwareAddressSupplier subscribed to rebind address")).doOnNext(address -> logger.debug("RebindAwareAddressSupplier rebind address: {}", address));
                }
                logger.debug("RebindAwareAddressSupplier falling back to original.");
                this.state.compareAndSet(current, null);
                return original.doOnSubscribe(s -> logger.debug("RebindAwareAddressSupplier original to rebind address")).doOnNext(address -> logger.debug("RebindAwareAddressSupplier original address: {}", address));
            });
        }

        private static final class State {
            final Instant cutoff;
            final SocketAddress rebindAddress;

            State(Instant cutoff, SocketAddress rebindAddress) {
                this.cutoff = cutoff;
                this.rebindAddress = rebindAddress;
            }

            public String toString() {
                StringBuilder sb = new StringBuilder();
                return sb.append("State [cutoff=").append(this.cutoff).append(", rebindAddress=").append(this.rebindAddress).append("]").toString();
            }
        }
    }

    static class MovingEvent {
        private static final int EVENT_ID_INDEX = 1;
        private static final int TIME_INDEX = 2;
        private static final int ADDRESS_INDEX = 3;
        private final Long eventId;
        private final InetSocketAddress endpoint;
        private final Duration time;

        private MovingEvent(Long eventId, Duration time, InetSocketAddress endpoint) {
            this.eventId = eventId;
            this.endpoint = endpoint;
            this.time = time;
        }

        static MovingEvent from(PushMessage message) {
            if (!MaintenanceAwareConnectionWatchdog.REBIND_MESSAGE_TYPE.equals(message.getType())) {
                return null;
            }
            List<Object> content = message.getContent();
            if (content.size() != 4) {
                logger.warn("Invalid re-bind message format, expected 4 elements, got {}", (Object)content.size());
                return null;
            }
            try {
                String addressAndPort;
                Long eventId = (Long)content.get(1);
                Long timeInSec = (Long)content.get(2);
                ByteBuffer addressBuffer = (ByteBuffer)content.get(3);
                InetSocketAddress endpoint = null;
                if (addressBuffer != null && (addressAndPort = StringCodec.UTF8.decodeKey(addressBuffer)) != null && !"null".equals(addressAndPort)) {
                    String[] parts = addressAndPort.split(":");
                    String address = parts[0];
                    int port = Integer.parseInt(parts[1]);
                    endpoint = new InetSocketAddress(address, port);
                }
                return new MovingEvent(eventId, Duration.ofSeconds(timeInSec), endpoint);
            }
            catch (Exception e) {
                logger.error("Invalid re-bind message format", (Throwable)e);
                return null;
            }
        }

        public Long getEventId() {
            return this.eventId;
        }

        public InetSocketAddress getEndpoint() {
            return this.endpoint;
        }

        public Duration getTime() {
            return this.time;
        }
    }
}

