package com.couchbase.client.core.cnc.apptelemetry.collector;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.env.UserAgent;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.RequestContext;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.ClusterTopology;
import com.couchbase.client.core.topology.HostAndServicePorts;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.core.util.CbStrings;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.util.annotation.Nullable;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/cnc/apptelemetry/collector/AppTelemetryCollectorImpl.class */
public final class AppTelemetryCollectorImpl implements AppTelemetryCollector {
    private static final Logger log = LoggerFactory.getLogger(AppTelemetryCollectorImpl.class);
    private final String agent;
    private final ConcurrentMap<NodeAndBucket, AppTelemetryMetricSet> metricSets = new ConcurrentHashMap();
    private volatile boolean paused;

    @Nullable
    private volatile ClusterTopology latestTopology;

    public AppTelemetryCollectorImpl(Flux<ClusterConfig> flux, UserAgent userAgent) {
        this.agent = userAgent.name() + "/" + userAgent.version();
        flux.doOnNext(clusterConfig -> {
            try {
                ClusterTopology globalTopology = clusterConfig.globalTopology();
                this.latestTopology = globalTopology;
                prune(globalTopology == null ? Collections.emptyList() : globalTopology.nodes());
            } catch (Exception e) {
                log.warn("PRUNE: Failed to prune App Telemetry metrics", e);
            }
        }).subscribe();
    }

    private void prune(List<HostAndServicePorts> list) {
        Set set = (Set) list.stream().map(hostAndServicePorts -> {
            return hostAndServicePorts.id().canonical();
        }).collect(Collectors.toSet());
        log.debug("PRUNE: Canonical addresses of nodes in cluster: {}", RedactableArgument.redactSystem(set));
        this.metricSets.keySet().removeIf(nodeAndBucket -> {
            boolean z = !set.contains(nodeAndBucket.nodeId.canonical());
            if (z) {
                log.info("PRUNE: Discarding App Telemetry metrics for node that is no longer in cluster: {}", RedactableArgument.redactSystem(nodeAndBucket.nodeId));
            }
            return z;
        });
    }

    @Override // com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCollector
    public synchronized void setPaused(boolean z) {
        if (this.paused == z) {
            return;
        }
        log.info("{} app telemetry collection.", z ? "Pausing" : "Resuming");
        this.paused = z;
        if (z) {
            prune(Collections.emptyList());
        }
    }

    @Override // com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCollector
    public void recordLatency(Request<?> request) {
        AppTelemetryRequestType classify;
        if (this.paused || (classify = AppTelemetryRequestClassifier.classify(request)) == null) {
            return;
        }
        RequestContext context = request.context();
        recordLatency(context.lastDispatchedToNode(), request.bucket(), classify, context.dispatchLatency());
    }

    private void recordLatency(NodeIdentifier nodeIdentifier, @Nullable String str, AppTelemetryRequestType appTelemetryRequestType, long j) {
        this.metricSets.computeIfAbsent(new NodeAndBucket(nodeIdentifier, str), nodeAndBucket -> {
            return new AppTelemetryMetricSet();
        }).histograms.get(appTelemetryRequestType).record(j);
        increment(nodeIdentifier, str, appTelemetryRequestType.service, AppTelemetryCounterType.TOTAL);
    }

    @Override // com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCollector
    public void increment(Request<?> request, AppTelemetryCounterType appTelemetryCounterType) {
        NodeIdentifier lastDispatchedToNode;
        if (this.paused || AppTelemetryRequestClassifier.classify(request) == null || (lastDispatchedToNode = request.context().lastDispatchedToNode()) == null) {
            return;
        }
        increment(lastDispatchedToNode, request.bucket(), request.serviceType(), appTelemetryCounterType);
    }

    private void increment(NodeIdentifier nodeIdentifier, @Nullable String str, ServiceType serviceType, AppTelemetryCounterType appTelemetryCounterType) {
        Map<AppTelemetryCounterType, AppTelemetryCounter> map = this.metricSets.computeIfAbsent(new NodeAndBucket(nodeIdentifier, str), nodeAndBucket -> {
            return new AppTelemetryMetricSet();
        }).counters.get(serviceType);
        if (map == null) {
            return;
        }
        map.get(appTelemetryCounterType).increment();
        if (appTelemetryCounterType != AppTelemetryCounterType.TOTAL) {
            map.get(AppTelemetryCounterType.TOTAL).increment();
        }
    }

    @Override // com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCollector
    public synchronized void reportTo(Consumer<? super CharSequence> consumer) {
        long currentTimeMillis = System.currentTimeMillis();
        this.metricSets.forEach((nodeAndBucket, appTelemetryMetricSet) -> {
            String nodeUuid = getNodeUuid(nodeAndBucket);
            if (nodeUuid == null) {
                return;
            }
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("agent", this.agent);
            linkedHashMap.put("node_uuid", nodeUuid);
            nodeAndBucket.writeTo(linkedHashMap);
            appTelemetryMetricSet.forEachMetric(reportable -> {
                reportable.reportTo(consumer, linkedHashMap, currentTimeMillis);
            });
        });
    }

    @Nullable
    private String getNodeUuid(NodeAndBucket nodeAndBucket) {
        ClusterTopology clusterTopology = this.latestTopology;
        if (clusterTopology == null) {
            return null;
        }
        return (String) clusterTopology.nodes().stream().filter(hostAndServicePorts -> {
            return hostAndServicePorts.id().equals(nodeAndBucket.nodeId);
        }).findFirst().map(hostAndServicePorts2 -> {
            return CbStrings.nullToEmpty(hostAndServicePorts2.uuid());
        }).orElse(null);
    }
}
