/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.node;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.events.node.NodeConnectedEvent;
import com.couchbase.client.core.cnc.events.node.NodeCreatedEvent;
import com.couchbase.client.core.cnc.events.node.NodeDisconnectIgnoredEvent;
import com.couchbase.client.core.cnc.events.node.NodeDisconnectedEvent;
import com.couchbase.client.core.cnc.events.node.NodeStateChangedEvent;
import com.couchbase.client.core.cnc.events.service.ServiceAddIgnoredEvent;
import com.couchbase.client.core.cnc.events.service.ServiceAddedEvent;
import com.couchbase.client.core.cnc.events.service.ServiceRemoveIgnoredEvent;
import com.couchbase.client.core.cnc.events.service.ServiceRemovedEvent;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.InternalEndpointDiagnostics;
import com.couchbase.client.core.env.Authenticator;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.node.NodeContext;
import com.couchbase.client.core.node.NodeState;
import com.couchbase.client.core.retry.RetryOrchestrator;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.service.AnalyticsService;
import com.couchbase.client.core.service.AnalyticsServiceConfig;
import com.couchbase.client.core.service.BackupService;
import com.couchbase.client.core.service.EventingService;
import com.couchbase.client.core.service.KeyValueService;
import com.couchbase.client.core.service.KeyValueServiceConfig;
import com.couchbase.client.core.service.ManagerService;
import com.couchbase.client.core.service.QueryService;
import com.couchbase.client.core.service.QueryServiceConfig;
import com.couchbase.client.core.service.SearchService;
import com.couchbase.client.core.service.SearchServiceConfig;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.service.ServiceScope;
import com.couchbase.client.core.service.ServiceState;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.service.ViewService;
import com.couchbase.client.core.service.ViewServiceConfig;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.core.util.AtomicEnumSet;
import com.couchbase.client.core.util.CompositeStateful;
import com.couchbase.client.core.util.HostAndPort;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.core.util.Stateful;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Node
implements Stateful<NodeState> {
    private static final String GLOBAL_SCOPE = "_$GLOBAL$_";
    private static final String BUCKET_GLOBAL_SCOPE = "_$BUCKET_GLOBAL$_";
    private final NodeIdentifier identifier;
    private final NodeContext ctx;
    private final Authenticator authenticator;
    private final Map<String, Map<ServiceType, Service>> services;
    private final AtomicBoolean disconnect;
    private final CompositeStateful<Service, ServiceState, NodeState> serviceStates;
    private final AtomicEnumSet<ServiceType> enabledServices = AtomicEnumSet.noneOf(ServiceType.class);

    public static Node create(CoreContext ctx, NodeIdentifier identifier) {
        return new Node(ctx, identifier);
    }

    protected Node(CoreContext ctx, NodeIdentifier identifier) {
        this.identifier = identifier;
        this.ctx = new NodeContext(ctx, identifier);
        this.authenticator = ctx.authenticator();
        this.services = new ConcurrentHashMap<String, Map<ServiceType, Service>>();
        this.disconnect = new AtomicBoolean(false);
        this.serviceStates = CompositeStateful.create(NodeState.DISCONNECTED, serviceStates -> {
            if (serviceStates.isEmpty()) {
                return NodeState.DISCONNECTED;
            }
            int connected = 0;
            int connecting = 0;
            int disconnecting = 0;
            int idle = 0;
            int degraded = 0;
            block8: for (ServiceState service : serviceStates) {
                switch (service) {
                    case CONNECTED: {
                        ++connected;
                        continue block8;
                    }
                    case CONNECTING: {
                        ++connecting;
                        continue block8;
                    }
                    case DISCONNECTING: {
                        ++disconnecting;
                        continue block8;
                    }
                    case DEGRADED: {
                        ++degraded;
                        continue block8;
                    }
                    case IDLE: {
                        ++idle;
                        continue block8;
                    }
                    case DISCONNECTED: {
                        continue block8;
                    }
                }
                throw InvalidArgumentException.fromMessage("Unknown unhandled state " + (Object)((Object)service) + ", this is a bug!");
            }
            if (serviceStates.size() == idle) {
                return NodeState.IDLE;
            }
            if (serviceStates.size() == connected + idle) {
                return NodeState.CONNECTED;
            }
            if (connected > 0 || degraded > 0) {
                return NodeState.DEGRADED;
            }
            if (connecting > 0) {
                return NodeState.CONNECTING;
            }
            if (disconnecting > 0) {
                return NodeState.DISCONNECTING;
            }
            return NodeState.DISCONNECTED;
        }, (from, to) -> ctx.environment().eventBus().publish(new NodeStateChangedEvent(this.ctx, (NodeState)((Object)from), (NodeState)((Object)to))));
        ctx.environment().eventBus().publish(new NodeCreatedEvent(Duration.ZERO, this.ctx));
        ctx.environment().eventBus().publish(new NodeConnectedEvent(Duration.ZERO, this.ctx));
    }

    public synchronized Mono<Void> disconnect() {
        return Mono.defer(() -> {
            if (this.disconnect.compareAndSet(false, true)) {
                AtomicLong start = new AtomicLong();
                return Flux.fromIterable(this.services.entrySet()).flatMap(entry -> {
                    start.set(System.nanoTime());
                    return Flux.fromIterable(((Map)entry.getValue()).keySet()).flatMap(serviceType -> Mono.fromRunnable(() -> this.removeService((ServiceType)((Object)((Object)((Object)serviceType))), Optional.of((String)entry.getKey()), true)));
                }).then().doOnTerminate(() -> this.ctx.environment().eventBus().publish(new NodeDisconnectedEvent(Duration.ofNanos(System.nanoTime() - start.get()), this.ctx)));
            }
            this.ctx.environment().eventBus().publish(new NodeDisconnectIgnoredEvent(Event.Severity.DEBUG, NodeDisconnectIgnoredEvent.Reason.DISCONNECTED, this.ctx));
            return Mono.empty();
        });
    }

    public Mono<Void> addService(ServiceType type, int port, Optional<String> bucket) {
        return Mono.fromRunnable(() -> {
            if (this.disconnect.get()) {
                this.ctx.environment().eventBus().publish(new ServiceAddIgnoredEvent(Event.Severity.DEBUG, ServiceAddIgnoredEvent.Reason.DISCONNECTED, this.ctx));
                return;
            }
            HostAndPort newServiceAddress = new HostAndPort(this.identifier.hostForNetworkConnections(), port);
            String name = type.scope() == ServiceScope.CLUSTER ? GLOBAL_SCOPE : bucket.orElse(BUCKET_GLOBAL_SCOPE);
            Map localMap = this.services.computeIfAbsent(name, key -> new ConcurrentHashMap());
            Service existingService = (Service)localMap.get((Object)type);
            if (existingService != null && !existingService.address().equals(newServiceAddress)) {
                this.removeService(type, bucket, true);
            }
            if (localMap.containsKey((Object)type)) {
                this.ctx.environment().eventBus().publish(new ServiceAddIgnoredEvent(Event.Severity.VERBOSE, ServiceAddIgnoredEvent.Reason.ALREADY_ADDED, this.ctx));
                return;
            }
            NanoTimestamp start = NanoTimestamp.now();
            Service service = this.createService(type, newServiceAddress, bucket);
            this.serviceStates.register(service, service);
            localMap.put(type, service);
            this.enabledServices.add(type);
            service.connect();
            this.ctx.environment().eventBus().publish(new ServiceAddedEvent(start.elapsed(), service.context()));
        });
    }

    public Mono<Void> removeService(ServiceType type, Optional<String> bucket) {
        return Mono.fromRunnable(() -> this.removeService(type, bucket, false));
    }

    private void removeService(ServiceType type, Optional<String> bucket, boolean ignoreDisconnect) {
        if (this.disconnect.get() && !ignoreDisconnect) {
            this.ctx.environment().eventBus().publish(new ServiceRemoveIgnoredEvent(Event.Severity.DEBUG, ServiceRemoveIgnoredEvent.Reason.DISCONNECTED, this.ctx));
            return;
        }
        String name = type.scope() == ServiceScope.CLUSTER ? GLOBAL_SCOPE : bucket.orElse(BUCKET_GLOBAL_SCOPE);
        Map<ServiceType, Service> localMap = this.services.get(name);
        if (localMap == null || !localMap.containsKey((Object)type)) {
            this.ctx.environment().eventBus().publish(new ServiceRemoveIgnoredEvent(Event.Severity.DEBUG, ServiceRemoveIgnoredEvent.Reason.NOT_PRESENT, this.ctx));
            return;
        }
        Service service = localMap.remove((Object)type);
        this.serviceStates.deregister(service);
        long start = System.nanoTime();
        if (this.serviceCanBeDisabled(service.type())) {
            this.enabledServices.remove((Object)service.type());
        }
        service.disconnect();
        long end = System.nanoTime();
        this.ctx.environment().eventBus().publish(new ServiceRemovedEvent(Duration.ofNanos(end - start), service.context()));
    }

    private boolean serviceCanBeDisabled(ServiceType serviceType) {
        return this.services.values().stream().noneMatch(m -> m.containsKey((Object)serviceType));
    }

    @Override
    public Flux<NodeState> states() {
        return this.serviceStates.states();
    }

    @Override
    public NodeState state() {
        return this.serviceStates.state();
    }

    public Optional<Flux<ServiceState>> serviceState(ServiceType type, Optional<String> bucket) {
        String name = type.scope() == ServiceScope.CLUSTER ? GLOBAL_SCOPE : bucket.orElse(BUCKET_GLOBAL_SCOPE);
        Map<ServiceType, Service> s = this.services.get(name);
        if (s == null) {
            return Optional.empty();
        }
        return Optional.ofNullable(s.get((Object)type)).map(Stateful::states);
    }

    public <R extends Request<? extends Response>> void send(R request) {
        Map<ServiceType, Service> scope;
        String bucket;
        if (request.serviceType().scope() == ServiceScope.BUCKET) {
            bucket = request.bucket();
            if (bucket == null) {
                bucket = BUCKET_GLOBAL_SCOPE;
            }
        } else {
            bucket = GLOBAL_SCOPE;
        }
        if ((scope = this.services.get(bucket)) == null) {
            this.sendIntoRetry(request);
            return;
        }
        Service service = scope.get((Object)request.serviceType());
        if (service == null) {
            this.sendIntoRetry(request);
            return;
        }
        request.context().lastDispatchedToNode(this.identifier);
        service.send(request);
    }

    protected <R extends Request<? extends Response>> void sendIntoRetry(R request) {
        RetryOrchestrator.maybeRetry(this.ctx, request, RetryReason.SERVICE_NOT_AVAILABLE);
    }

    public NodeIdentifier identifier() {
        return this.identifier;
    }

    public boolean serviceEnabled(ServiceType type) {
        return this.enabledServices.contains(type);
    }

    public boolean hasServicesEnabled() {
        return !this.enabledServices.isEmpty();
    }

    protected Service createService(ServiceType serviceType, HostAndPort address, Optional<String> bucket) {
        CoreEnvironment env = this.ctx.environment();
        String host = address.host();
        int port = address.port();
        switch (serviceType) {
            case KV: {
                return new KeyValueService(KeyValueServiceConfig.endpoints(env.ioConfig().numKvConnections()).build(), this.ctx, host, port, bucket, this.authenticator);
            }
            case MANAGER: {
                return new ManagerService(this.ctx, host, port);
            }
            case QUERY: {
                return new QueryService(((QueryServiceConfig.Builder)QueryServiceConfig.maxEndpoints(env.ioConfig().maxHttpConnections()).idleTime(env.ioConfig().idleHttpConnectionTimeout())).build(), this.ctx, host, port);
            }
            case VIEWS: {
                return new ViewService(((ViewServiceConfig.Builder)ViewServiceConfig.maxEndpoints(env.ioConfig().maxHttpConnections()).idleTime(env.ioConfig().idleHttpConnectionTimeout())).build(), this.ctx, host, port);
            }
            case SEARCH: {
                return new SearchService(((SearchServiceConfig.Builder)SearchServiceConfig.maxEndpoints(env.ioConfig().maxHttpConnections()).idleTime(env.ioConfig().idleHttpConnectionTimeout())).build(), this.ctx, host, port);
            }
            case ANALYTICS: {
                return new AnalyticsService(((AnalyticsServiceConfig.Builder)AnalyticsServiceConfig.maxEndpoints(env.ioConfig().maxHttpConnections()).idleTime(env.ioConfig().idleHttpConnectionTimeout())).build(), this.ctx, host, port);
            }
            case EVENTING: {
                return new EventingService(this.ctx, host, port);
            }
            case BACKUP: {
                return new BackupService(this.ctx, host, port);
            }
        }
        throw InvalidArgumentException.fromMessage("Unsupported ServiceType: " + (Object)((Object)serviceType));
    }

    public Stream<EndpointDiagnostics> diagnostics() {
        return this.services.values().stream().flatMap(services -> services.values().stream()).flatMap(service -> service.diagnostics());
    }

    @Stability.Internal
    public Stream<InternalEndpointDiagnostics> internalDiagnostics() {
        return this.services.values().stream().flatMap(services -> services.values().stream()).flatMap(service -> service.internalDiagnostics());
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        Node node = (Node)o;
        return Objects.equals(this.identifier, node.identifier);
    }

    public int hashCode() {
        return Objects.hash(this.identifier);
    }

    public String toString() {
        return "Node{identifier=" + RedactableArgument.redactSystem(this.identifier) + ", ctx=" + this.ctx + ", services=" + this.services + ", disconnect=" + this.disconnect + ", serviceStates=" + this.serviceStates + ", enabledServices=" + this.enabledServices + '}';
    }
}

