package io.confluent.ksql.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/internal/ThroughputMetricsReporter.class */
public class ThroughputMetricsReporter implements org.apache.kafka.common.metrics.MetricsReporter {
    private static final String THROUGHPUT_METRICS_GROUP = "ksql-query-throughput-metrics";
    private Metrics metricRegistry;
    private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputMetricsReporter.class);
    private static final String RECORDS_CONSUMED = "records-consumed-total";
    private static final String BYTES_CONSUMED = "bytes-consumed-total";
    private static final String RECORDS_PRODUCED = "records-produced-total";
    private static final String BYTES_PRODUCED = "bytes-produced-total";
    private static final Set<String> THROUGHPUT_METRIC_NAMES = Utils.mkSet(new String[]{RECORDS_CONSUMED, BYTES_CONSUMED, RECORDS_PRODUCED, BYTES_PRODUCED});
    private static final Map<String, Map<String, Map<MetricName, ThroughputTotalMetric>>> metrics = new HashMap();
    private static final Map<String, String> customTags = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/internal/ThroughputMetricsReporter$ThroughputTotalMetric.class */
    public static class ThroughputTotalMetric extends CumulativeSum {
        final Map<MetricName, KafkaMetric> throughputTotalMetrics = new HashMap();

        ThroughputTotalMetric(KafkaMetric kafkaMetric) {
            add(kafkaMetric);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(KafkaMetric kafkaMetric) {
            this.throughputTotalMetrics.put(kafkaMetric.metricName(), kafkaMetric);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void remove(MetricName metricName) {
            this.throughputTotalMetrics.remove(metricName);
        }

        public double measure(MetricConfig metricConfig, long j) {
            return ((Double) this.throughputTotalMetrics.values().stream().map(kafkaMetric -> {
                return (Double) kafkaMetric.metricValue();
            }).reduce((v0, v1) -> {
                return Double.sum(v0, v1);
            }).orElse(Double.valueOf(0.0d))).doubleValue();
        }
    }

    public void init(List<KafkaMetric> list) {
    }

    @VisibleForTesting
    static void reset() {
        metrics.clear();
    }

    public synchronized void configure(Map<String, ?> map) {
        this.metricRegistry = (Metrics) Objects.requireNonNull(map.get("ksql.internal.metrics"));
        customTags.putAll(KsqlConfig.getStringAsMap("ksql.metrics.tags.custom", map));
    }

    public void metricChange(KafkaMetric kafkaMetric) {
        if (THROUGHPUT_METRIC_NAMES.contains(kafkaMetric.metricName().name()) && "stream-topic-metrics".equals(kafkaMetric.metricName().group())) {
            addMetric(kafkaMetric, getQueryId(kafkaMetric), getTopic(kafkaMetric));
        }
    }

    private synchronized void addMetric(KafkaMetric kafkaMetric, String str, String str2) {
        MetricName throughputTotalMetricName = getThroughputTotalMetricName(str, str2, kafkaMetric.metricName());
        LOGGER.debug("Adding metric {}", throughputTotalMetricName);
        if (!metrics.containsKey(str)) {
            metrics.put(str, new HashMap());
        }
        if (!metrics.get(str).containsKey(str2)) {
            metrics.get(str).put(str2, new HashMap());
        }
        ThroughputTotalMetric throughputTotalMetric = metrics.get(str).get(str2).get(throughputTotalMetricName);
        if (throughputTotalMetric != null) {
            throughputTotalMetric.add(kafkaMetric);
            return;
        }
        ThroughputTotalMetric throughputTotalMetric2 = new ThroughputTotalMetric(kafkaMetric);
        metrics.get(str).get(str2).put(throughputTotalMetricName, throughputTotalMetric2);
        this.metricRegistry.addMetric(throughputTotalMetricName, throughputTotalMetric2);
    }

    public void metricRemoval(KafkaMetric kafkaMetric) {
        if (THROUGHPUT_METRIC_NAMES.contains(kafkaMetric.metricName().name()) && "stream-topic-metrics".equals(kafkaMetric.metricName().group())) {
            removeMetric(kafkaMetric, getQueryId(kafkaMetric), getTopic(kafkaMetric));
        }
    }

    private synchronized void removeMetric(KafkaMetric kafkaMetric, String str, String str2) {
        MetricName throughputTotalMetricName = getThroughputTotalMetricName(str, str2, kafkaMetric.metricName());
        LOGGER.debug("Removing metric {}", throughputTotalMetricName);
        if (metrics.containsKey(str) && metrics.get(str).containsKey(str2) && metrics.get(str).get(str2).containsKey(throughputTotalMetricName)) {
            ThroughputTotalMetric throughputTotalMetric = metrics.get(str).get(str2).get(throughputTotalMetricName);
            throughputTotalMetric.remove(kafkaMetric.metricName());
            if (throughputTotalMetric.throughputTotalMetrics.isEmpty()) {
                metrics.get(str).get(str2).remove(throughputTotalMetricName);
                this.metricRegistry.removeMetric(throughputTotalMetricName);
                if (metrics.get(str).get(str2).isEmpty()) {
                    metrics.get(str).remove(str2);
                    if (metrics.get(str).isEmpty()) {
                        metrics.remove(str);
                    }
                }
            }
        }
    }

    public void close() {
    }

    public Set<String> reconfigurableConfigs() {
        return null;
    }

    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
    }

    public void reconfigure(Map<String, ?> map) {
    }

    public void contextChange(MetricsContext metricsContext) {
    }

    private MetricName getThroughputTotalMetricName(String str, String str2, MetricName metricName) {
        return new MetricName(metricName.name(), THROUGHPUT_METRICS_GROUP, metricName.description() + " by this query", getThroughputTotalMetricTags(str, str2, metricName.tags()));
    }

    private String getQueryId(KafkaMetric kafkaMetric) {
        if (kafkaMetric.metricName().tags().containsKey(MetricsTagUtils.KSQL_QUERY_ID_TAG)) {
            return (String) kafkaMetric.metricName().tags().get(MetricsTagUtils.KSQL_QUERY_ID_TAG);
        }
        Matcher matcher = MetricsTagUtils.SHARED_RUNTIME_THREAD_PATTERN.matcher((String) kafkaMetric.metricName().tags().getOrDefault(MetricsTagUtils.KSQL_TASK_ID_TAG, ""));
        if (matcher.find()) {
            return matcher.group(1);
        }
        Matcher matcher2 = MetricsTagUtils.UNSHARED_RUNTIME_THREAD_PATTERN.matcher((String) kafkaMetric.metricName().tags().getOrDefault("thread-id", ""));
        if (matcher2.find()) {
            return matcher2.group(1);
        }
        LOGGER.error("Can't parse query id from metric {}", kafkaMetric.metricName());
        throw new KsqlException("Missing query ID when reporting total throughput metrics");
    }

    private String getTopic(KafkaMetric kafkaMetric) {
        if (kafkaMetric.metricName().tags().containsKey(MetricsTagUtils.KSQL_TOPIC_TAG)) {
            return (String) kafkaMetric.metricName().tags().get(MetricsTagUtils.KSQL_TOPIC_TAG);
        }
        String str = (String) kafkaMetric.metricName().tags().getOrDefault(MetricsTagUtils.KSQL_TOPIC_TAG, "");
        if (!str.equals("")) {
            return str;
        }
        LOGGER.error("Can't parse topic name from metric {}", kafkaMetric);
        throw new KsqlException("Missing topic name when reporting total throughput metrics");
    }

    private Map<String, String> getThroughputTotalMetricTags(String str, String str2, Map<String, String> map) {
        HashMap hashMap = new HashMap(customTags);
        hashMap.putAll(map);
        hashMap.remove(MetricsTagUtils.KSQL_TASK_ID_TAG);
        hashMap.remove("processor-node-id");
        hashMap.put(MetricsTagUtils.KSQL_CONSUMER_GROUP_MEMBER_ID_TAG, (String) hashMap.remove("thread-id"));
        hashMap.put(MetricsTagUtils.KSQL_QUERY_ID_TAG, str);
        return ImmutableMap.copyOf(hashMap);
    }
}
