/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.cnc.apptelemetry;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCollector;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCollectorImpl;
import com.couchbase.client.core.cnc.apptelemetry.reporter.AppTelemetryReporter;
import com.couchbase.client.core.cnc.apptelemetry.reporter.AppTelemetryReporterImpl;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.ClusterTopology;
import com.couchbase.client.core.topology.HostAndServicePorts;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.HostAndPort;
import java.io.Closeable;
import java.net.URI;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class AppTelemetry
implements Closeable {
    public final AppTelemetryCollector collector;
    public final AppTelemetryReporter reporter;

    private AppTelemetry(AppTelemetryCollector collector, AppTelemetryReporter reporter) {
        this.collector = Objects.requireNonNull(collector);
        this.reporter = Objects.requireNonNull(reporter);
    }

    public static AppTelemetry from(CoreContext ctx) {
        CoreEnvironment env = ctx.environment();
        if (env.appTelemetryDisabled()) {
            return new AppTelemetry(AppTelemetryCollector.NOOP, AppTelemetryReporter.NOOP);
        }
        ConfigurationProvider configurationProvider = ctx.core().configurationProvider();
        AppTelemetryCollectorImpl collector = new AppTelemetryCollectorImpl(configurationProvider.configs(), env.userAgent());
        AppTelemetryReporterImpl reporter = new AppTelemetryReporterImpl(ctx, collector);
        URI appTelemetryUri = env.appTelemetryEndpoint();
        if (appTelemetryUri != null) {
            reporter.updateRemotes(CbCollections.setOf(appTelemetryUri));
        } else {
            boolean tls = env.securityConfig().tlsEnabled();
            AppTelemetry.appTelemetryUris(tls, configurationProvider).doOnNext(reporter::updateRemotes).subscribe();
        }
        return new AppTelemetry(collector, reporter);
    }

    private static Flux<Set<URI>> appTelemetryUris(boolean tls, ConfigurationProvider configurationProvider) {
        return configurationProvider.configs().map(clusterconfig -> AppTelemetry.allNodes(clusterconfig).map(node -> {
            String path = node.appTelemetryPath();
            if (path == null) {
                return null;
            }
            int managerPort = node.port(ServiceType.MANAGER).orElseThrow(() -> new NoSuchElementException("missing manager port?"));
            HostAndPort managerAddress = new HostAndPort(node.host(), managerPort);
            String scheme = tls ? "wss" : "ws";
            return URI.create(scheme + "://" + managerAddress.format() + path);
        }).filter(Objects::nonNull).collect(Collectors.toSet()));
    }

    @Override
    public void close() {
        this.reporter.close();
    }

    private static Stream<HostAndServicePorts> allNodes(ClusterConfig config) {
        return AppTelemetry.allTopologies(config).flatMap(it -> it.nodes().stream());
    }

    private static Stream<ClusterTopology> allTopologies(ClusterConfig config) {
        return Stream.concat(AppTelemetry.streamOfNullable(config.globalTopology()), config.bucketTopologies().stream());
    }

    private static <T> Stream<T> streamOfNullable(@Nullable T item) {
        return item == null ? Stream.empty() : Stream.of(item);
    }
}

