package com.couchbase.client.core;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.core.BucketClosedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenedEvent;
import com.couchbase.client.core.cnc.events.core.CoreCreatedEvent;
import com.couchbase.client.core.cnc.events.core.InitGlobalConfigFailedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationCompletedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationErrorDetectedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationIgnoredEvent;
import com.couchbase.client.core.cnc.events.core.ServiceReconfigurationFailedEvent;
import com.couchbase.client.core.cnc.events.core.ShutdownCompletedEvent;
import com.couchbase.client.core.config.AlternateAddress;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.DefaultConfigurationProvider;
import com.couchbase.client.core.config.GlobalConfig;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.env.Authenticator;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.error.ConfigException;
import com.couchbase.client.core.error.GlobalConfigNotFoundException;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.UnsupportedConfigMechanismException;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.node.KeyValueLocator;
import com.couchbase.client.core.node.Locator;
import com.couchbase.client.core.node.Node;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.node.RoundRobinLocator;
import com.couchbase.client.core.node.ViewLocator;
import com.couchbase.client.core.service.ServiceScope;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbCollections;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Stability.Volatile
/* loaded from: input_file:com/couchbase/client/core/Core.class */
public class Core {
    private static final int GLOBAL_ID = new SecureRandom().nextInt();
    private static final AtomicInteger CORE_IDS = new AtomicInteger();
    private static final KeyValueLocator KEY_VALUE_LOCATOR = new KeyValueLocator();
    private static final RoundRobinLocator MANAGER_LOCATOR = new RoundRobinLocator(ServiceType.MANAGER);
    private static final RoundRobinLocator QUERY_LOCATOR = new RoundRobinLocator(ServiceType.QUERY);
    private static final RoundRobinLocator ANALYTICS_LOCATOR = new RoundRobinLocator(ServiceType.ANALYTICS);
    private static final RoundRobinLocator SEARCH_LOCATOR = new RoundRobinLocator(ServiceType.SEARCH);
    private static final RoundRobinLocator VIEWS_LOCATOR = new ViewLocator();
    private final CoreContext coreContext;
    private final ConfigurationProvider configurationProvider;
    private volatile ClusterConfig currentConfig;
    private final CopyOnWriteArrayList<Node> nodes;
    private final AtomicBoolean reconfigureInProgress = new AtomicBoolean(false);
    private final AtomicBoolean moreConfigsPending = new AtomicBoolean(false);
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final EventBus eventBus;
    private final Timer timer;
    private final Set<SeedNode> seedNodes;

    public static Core create(CoreEnvironment coreEnvironment, Authenticator authenticator, Set<SeedNode> set) {
        return new Core(coreEnvironment, authenticator, set);
    }

    protected Core(CoreEnvironment coreEnvironment, Authenticator authenticator, Set<SeedNode> set) {
        if (coreEnvironment.securityConfig().tlsEnabled() && !authenticator.supportsTls()) {
            throw new InvalidArgumentException("TLS enabled but the Authenticator does not support TLS!", null, null);
        }
        if (!coreEnvironment.securityConfig().tlsEnabled() && !authenticator.supportsNonTls()) {
            throw new InvalidArgumentException("TLS not enabled but the Authenticator does only support TLS!", null, null);
        }
        this.seedNodes = set;
        this.coreContext = new CoreContext(this, createInstanceId(), coreEnvironment, authenticator);
        this.configurationProvider = createConfigurationProvider();
        this.nodes = new CopyOnWriteArrayList<>();
        this.eventBus = coreEnvironment.eventBus();
        this.timer = coreEnvironment.timer();
        this.currentConfig = this.configurationProvider.config();
        this.configurationProvider.configs().subscribe(clusterConfig -> {
            this.currentConfig = clusterConfig;
            reconfigure();
        });
        this.eventBus.publish(new CoreCreatedEvent(this.coreContext, coreEnvironment));
    }

    private long createInstanceId() {
        return (GLOBAL_ID << 32) | (CORE_IDS.incrementAndGet() & 4294967295L);
    }

    ConfigurationProvider createConfigurationProvider() {
        return new DefaultConfigurationProvider(this, this.seedNodes);
    }

    @Stability.Internal
    public ConfigurationProvider configurationProvider() {
        return this.configurationProvider;
    }

    public <R extends Response> void send(Request<R> request) {
        send(request, true);
    }

    @Stability.Internal
    public <R extends Response> void send(Request<R> request, boolean z) {
        if (this.shutdown.get()) {
            request.cancel(CancellationReason.SHUTDOWN);
            return;
        }
        if (z) {
            this.timer.register(request);
        }
        locator(request.serviceType()).dispatch(request, this.nodes, this.currentConfig, context());
    }

    public CoreContext context() {
        return this.coreContext;
    }

    @Stability.Internal
    public Stream<EndpointDiagnostics> diagnostics() {
        return this.nodes.stream().flatMap((v0) -> {
            return v0.diagnostics();
        });
    }

    @Stability.Internal
    public void initGlobalConfig() {
        long nanoTime = System.nanoTime();
        this.configurationProvider.loadAndRefreshGlobalConfig().subscribe(r1 -> {
        }, th -> {
            InitGlobalConfigFailedEvent.Reason reason = InitGlobalConfigFailedEvent.Reason.UNKNOWN;
            if (th instanceof UnsupportedConfigMechanismException) {
                reason = InitGlobalConfigFailedEvent.Reason.UNSUPPORTED;
            } else if (th instanceof GlobalConfigNotFoundException) {
                reason = InitGlobalConfigFailedEvent.Reason.NO_CONFIG_FOUND;
            } else if ((th instanceof ConfigException) && (th.getCause() instanceof RequestCanceledException) && ((RequestCanceledException) th.getCause()).context().requestContext().request().cancellationReason() == CancellationReason.SHUTDOWN) {
                reason = InitGlobalConfigFailedEvent.Reason.SHUTDOWN;
            }
            this.eventBus.publish(new InitGlobalConfigFailedEvent(reason.severity(), Duration.ofNanos(System.nanoTime() - nanoTime), context(), reason, th));
        });
    }

    @Stability.Internal
    public void openBucket(String str) {
        long nanoTime = System.nanoTime();
        this.configurationProvider.openBucket(str).subscribe(r1 -> {
        }, th -> {
        }, () -> {
            this.eventBus.publish(new BucketOpenedEvent(Duration.ofNanos(System.nanoTime() - nanoTime), this.coreContext, str));
        });
    }

    @Stability.Internal
    public ClusterConfig clusterConfig() {
        return this.configurationProvider.config();
    }

    private Mono<Void> closeBucket(String str) {
        return Mono.defer(() -> {
            long nanoTime = System.nanoTime();
            return this.configurationProvider.closeBucket(str).doOnSuccess(r12 -> {
                this.eventBus.publish(new BucketClosedEvent(Duration.ofNanos(System.nanoTime() - nanoTime), this.coreContext, str));
            });
        });
    }

    @Stability.Internal
    public Mono<Void> ensureServiceAt(NodeIdentifier nodeIdentifier, ServiceType serviceType, int i, Optional<String> optional, Optional<String> optional2) {
        return this.shutdown.get() ? Mono.empty() : Flux.fromIterable(this.nodes).filter(node -> {
            return node.identifier().equals(nodeIdentifier);
        }).switchIfEmpty(Mono.defer(() -> {
            Node createNode = createNode(nodeIdentifier, optional2);
            this.nodes.add(createNode);
            return Mono.just(createNode);
        })).flatMap(node2 -> {
            return node2.addService(serviceType, i, optional);
        }).then();
    }

    protected Node createNode(NodeIdentifier nodeIdentifier, Optional<String> optional) {
        return Node.create(this.coreContext, nodeIdentifier, optional);
    }

    private Mono<Void> maybeRemoveNode(Node node, ClusterConfig clusterConfig) {
        return Mono.defer(() -> {
            return ((clusterConfig.bucketConfigs().values().stream().flatMap(bucketConfig -> {
                return bucketConfig.nodes().stream();
            }).anyMatch(nodeInfo -> {
                return nodeInfo.identifier().equals(node.identifier());
            }) || (clusterConfig.globalConfig() != null ? clusterConfig.globalConfig().portInfos().stream().anyMatch(portInfo -> {
                return portInfo.identifier().equals(node.identifier());
            }) : false)) && node.hasServicesEnabled()) ? Mono.empty() : node.disconnect().doOnTerminate(() -> {
                this.nodes.remove(node);
            });
        });
    }

    private Mono<Void> removeServiceFrom(NodeIdentifier nodeIdentifier, ServiceType serviceType, Optional<String> optional) {
        return Flux.fromIterable(new ArrayList(this.nodes)).filter(node -> {
            return node.identifier().equals(nodeIdentifier);
        }).filter(node2 -> {
            return node2.serviceEnabled(serviceType);
        }).flatMap(node3 -> {
            return node3.removeService(serviceType, optional);
        }).then();
    }

    @Stability.Internal
    public Mono<Void> shutdown() {
        return shutdown(this.coreContext.environment().timeoutConfig().disconnectTimeout());
    }

    @Stability.Internal
    public Mono<Void> shutdown(Duration duration) {
        return Mono.defer(() -> {
            long nanoTime = System.nanoTime();
            return this.shutdown.compareAndSet(false, true) ? Flux.fromIterable(this.currentConfig.bucketConfigs().keySet()).flatMap(this::closeBucket).then(this.configurationProvider.shutdown()).then(Flux.interval(Duration.ofMillis(10L), this.coreContext.environment().scheduler()).takeUntil(l -> {
                return this.nodes.isEmpty();
            }).then()).doOnTerminate(() -> {
                this.eventBus.publish(new ShutdownCompletedEvent(Duration.ofNanos(System.nanoTime() - nanoTime), this.coreContext));
            }).then() : Mono.empty();
        }).timeout(duration, this.coreContext.environment().scheduler());
    }

    private void reconfigure() {
        if (!this.reconfigureInProgress.compareAndSet(false, true)) {
            this.moreConfigsPending.set(true);
            this.eventBus.publish(new ReconfigurationIgnoredEvent(this.coreContext));
            return;
        }
        ClusterConfig clusterConfig = this.currentConfig;
        if (clusterConfig.bucketConfigs().isEmpty() && clusterConfig.globalConfig() == null) {
            reconfigureDisconnectAll();
        } else {
            long nanoTime = System.nanoTime();
            reconfigureBuckets(Flux.just(clusterConfig).flatMap(clusterConfig2 -> {
                return Flux.fromIterable(clusterConfig2.bucketConfigs().values());
            })).then(reconfigureGlobal(clusterConfig.globalConfig())).then(Mono.defer(() -> {
                return Flux.fromIterable(new ArrayList(this.nodes)).flatMap(node -> {
                    return maybeRemoveNode(node, clusterConfig);
                }).then();
            })).subscribe(r1 -> {
            }, th -> {
                clearReconfigureInProgress();
                this.eventBus.publish(new ReconfigurationErrorDetectedEvent(context(), th));
            }, () -> {
                clearReconfigureInProgress();
                this.eventBus.publish(new ReconfigurationCompletedEvent(Duration.ofNanos(System.nanoTime() - nanoTime), this.coreContext));
            });
        }
    }

    private void reconfigureDisconnectAll() {
        long nanoTime = System.nanoTime();
        Flux flatMap = Flux.fromIterable(new ArrayList(this.nodes)).flatMap((v0) -> {
            return v0.disconnect();
        });
        CopyOnWriteArrayList<Node> copyOnWriteArrayList = this.nodes;
        copyOnWriteArrayList.getClass();
        flatMap.doOnComplete(copyOnWriteArrayList::clear).subscribe(r1 -> {
        }, th -> {
            clearReconfigureInProgress();
            this.eventBus.publish(new ReconfigurationErrorDetectedEvent(context(), th));
        }, () -> {
            clearReconfigureInProgress();
            this.eventBus.publish(new ReconfigurationCompletedEvent(Duration.ofNanos(System.nanoTime() - nanoTime), this.coreContext));
        });
    }

    private void clearReconfigureInProgress() {
        this.reconfigureInProgress.set(false);
        if (this.moreConfigsPending.compareAndSet(true, false)) {
            reconfigure();
        }
    }

    private Mono<Void> reconfigureGlobal(GlobalConfig globalConfig) {
        return Mono.defer(() -> {
            return globalConfig == null ? Mono.empty() : Flux.fromIterable(globalConfig.portInfos()).flatMap(portInfo -> {
                boolean tlsEnabled = this.coreContext.environment().securityConfig().tlsEnabled();
                Set<Map.Entry<ServiceType, Integer>> set = null;
                Optional<String> alternateAddress = this.coreContext.alternateAddress();
                String str = null;
                if (alternateAddress.isPresent()) {
                    AlternateAddress alternateAddress2 = portInfo.alternateAddresses().get(alternateAddress.get());
                    str = alternateAddress2.hostname();
                    set = tlsEnabled ? alternateAddress2.sslServices().entrySet() : alternateAddress2.services().entrySet();
                }
                if (set == null || set.isEmpty()) {
                    set = tlsEnabled ? portInfo.sslPorts().entrySet() : portInfo.ports().entrySet();
                }
                String str2 = str;
                Set<Map.Entry<ServiceType, Integer>> set2 = set;
                return Flux.merge(new Publisher[]{Flux.fromIterable(set2).flatMap(entry -> {
                    return ensureServiceAt(portInfo.identifier(), (ServiceType) entry.getKey(), ((Integer) entry.getValue()).intValue(), Optional.empty(), Optional.ofNullable(str2)).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, portInfo.hostname(), (ServiceType) entry.getKey(), th));
                        return Mono.empty();
                    });
                }), Flux.fromIterable(Arrays.asList(ServiceType.values())).filter(serviceType -> {
                    Iterator it = set2.iterator();
                    while (it.hasNext()) {
                        if (((Map.Entry) it.next()).getKey() == serviceType) {
                            return false;
                        }
                    }
                    return true;
                }).flatMap(serviceType2 -> {
                    return removeServiceFrom(portInfo.identifier(), serviceType2, Optional.empty()).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, portInfo.hostname(), serviceType2, th));
                        return Mono.empty();
                    });
                })});
            }).then();
        });
    }

    private Mono<Void> reconfigureBuckets(Flux<BucketConfig> flux) {
        return flux.flatMap(bucketConfig -> {
            return Flux.fromIterable(bucketConfig.nodes()).flatMap(nodeInfo -> {
                boolean tlsEnabled = this.coreContext.environment().securityConfig().tlsEnabled();
                Set<Map.Entry<ServiceType, Integer>> set = null;
                Optional<String> alternateAddress = this.coreContext.alternateAddress();
                String str = null;
                if (alternateAddress.isPresent()) {
                    AlternateAddress alternateAddress2 = nodeInfo.alternateAddresses().get(alternateAddress.get());
                    str = alternateAddress2.hostname();
                    set = tlsEnabled ? alternateAddress2.sslServices().entrySet() : alternateAddress2.services().entrySet();
                }
                if (CbCollections.isNullOrEmpty(set)) {
                    set = tlsEnabled ? nodeInfo.sslServices().entrySet() : nodeInfo.services().entrySet();
                }
                String str2 = str;
                Set<Map.Entry<ServiceType, Integer>> set2 = set;
                return Flux.merge(new Publisher[]{Flux.fromIterable(set2).flatMap(entry -> {
                    return ensureServiceAt(nodeInfo.identifier(), (ServiceType) entry.getKey(), ((Integer) entry.getValue()).intValue(), ((ServiceType) entry.getKey()).scope() == ServiceScope.BUCKET ? Optional.of(bucketConfig.name()) : Optional.empty(), Optional.ofNullable(str2)).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, nodeInfo.hostname(), (ServiceType) entry.getKey(), th));
                        return Mono.empty();
                    });
                }), Flux.fromIterable(Arrays.asList(ServiceType.values())).filter(serviceType -> {
                    Iterator it = set2.iterator();
                    while (it.hasNext()) {
                        if (((Map.Entry) it.next()).getKey() == serviceType) {
                            return false;
                        }
                    }
                    return true;
                }).flatMap(serviceType2 -> {
                    return removeServiceFrom(nodeInfo.identifier(), serviceType2, serviceType2.scope() == ServiceScope.BUCKET ? Optional.of(bucketConfig.name()) : Optional.empty()).onErrorResume(th -> {
                        this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, nodeInfo.hostname(), serviceType2, th));
                        return Mono.empty();
                    });
                })});
            });
        }).then();
    }

    private static Locator locator(ServiceType serviceType) {
        switch (serviceType) {
            case KV:
                return KEY_VALUE_LOCATOR;
            case MANAGER:
                return MANAGER_LOCATOR;
            case QUERY:
                return QUERY_LOCATOR;
            case ANALYTICS:
                return ANALYTICS_LOCATOR;
            case SEARCH:
                return SEARCH_LOCATOR;
            case VIEWS:
                return VIEWS_LOCATOR;
            default:
                throw new IllegalStateException("Unsupported ServiceType: " + serviceType);
        }
    }
}
