package io.confluent.ksql.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlException;
import java.io.File;
import java.math.BigInteger;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/internal/StorageUtilizationMetricsReporter.class */
public class StorageUtilizationMetricsReporter implements org.apache.kafka.common.metrics.MetricsReporter {
    private static final String METRIC_GROUP = "ksqldb_utilization";
    private static final String TASK_STORAGE_USED_BYTES = "task_storage_used_bytes";
    private Map<String, Map<String, TaskStorageMetric>> metricsSeen;
    private Metrics metricRegistry;
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageUtilizationMetricsReporter.class);
    private static Map<String, String> customTags = new HashMap();
    private static final AtomicInteger numberStatefulTasks = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/internal/StorageUtilizationMetricsReporter$TaskStorageMetric.class */
    public static class TaskStorageMetric {
        final MetricName metricName;
        private final Map<MetricName, KafkaMetric> metrics = new ConcurrentHashMap();

        TaskStorageMetric(MetricName metricName) {
            this.metricName = metricName;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(KafkaMetric kafkaMetric) {
            this.metrics.put(kafkaMetric.metricName(), kafkaMetric);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void remove(KafkaMetric kafkaMetric) {
            this.metrics.remove(kafkaMetric.metricName());
        }

        public BigInteger getValue() {
            BigInteger bigInteger = BigInteger.ZERO;
            Iterator<KafkaMetric> it = this.metrics.values().iterator();
            while (it.hasNext()) {
                bigInteger = bigInteger.add((BigInteger) it.next().metricValue());
            }
            return bigInteger;
        }
    }

    public void init(List<KafkaMetric> list) {
    }

    public synchronized void configure(Map<String, ?> map) {
        this.metricRegistry = (Metrics) Objects.requireNonNull(map.get("ksql.internal.metrics"));
        this.metricsSeen = new HashMap();
    }

    public static void configureShared(File file, Metrics metrics, Map<String, String> map) {
        customTags = ImmutableMap.copyOf(map);
        LOGGER.info("Adding node level storage usage gauges");
        MetricName metricName = metrics.metricName("node_storage_free_bytes", METRIC_GROUP, customTags);
        MetricName metricName2 = metrics.metricName("node_storage_total_bytes", METRIC_GROUP, customTags);
        MetricName metricName3 = metrics.metricName("node_storage_used_bytes", METRIC_GROUP, customTags);
        MetricName metricName4 = metrics.metricName("storage_utilization", METRIC_GROUP, customTags);
        MetricName metricName5 = metrics.metricName("max_used_task_storage_bytes", METRIC_GROUP, customTags);
        MetricName metricName6 = metrics.metricName("num_stateful_tasks", METRIC_GROUP, customTags);
        metrics.addMetric(metricName, (metricConfig, j) -> {
            return Long.valueOf(file.getFreeSpace());
        });
        metrics.addMetric(metricName2, (metricConfig2, j2) -> {
            return Long.valueOf(file.getTotalSpace());
        });
        metrics.addMetric(metricName3, (metricConfig3, j3) -> {
            return Long.valueOf(file.getTotalSpace() - file.getFreeSpace());
        });
        metrics.addMetric(metricName4, (metricConfig4, j4) -> {
            return Double.valueOf((file.getTotalSpace() - file.getFreeSpace()) / file.getTotalSpace());
        });
        metrics.addMetric(metricName5, (metricConfig5, j5) -> {
            return getMaxTaskUsage(metrics);
        });
        metrics.addMetric(metricName6, (metricConfig6, j6) -> {
            return Integer.valueOf(numberStatefulTasks.get());
        });
    }

    public void metricChange(KafkaMetric kafkaMetric) {
        if (kafkaMetric.metricName().name().equals("total-sst-files-size")) {
            handleNewSstFilesSizeMetric(kafkaMetric, (String) kafkaMetric.metricName().tags().getOrDefault(MetricsTagUtils.KSQL_TASK_ID_TAG, ""), getQueryId(kafkaMetric));
        }
    }

    public void metricRemoval(KafkaMetric kafkaMetric) {
        if (kafkaMetric.metricName().name().equals("total-sst-files-size")) {
            String queryId = getQueryId(kafkaMetric);
            String str = (String) kafkaMetric.metricName().tags().getOrDefault(MetricsTagUtils.KSQL_TASK_ID_TAG, "");
            handleRemovedSstFileSizeMetric(this.metricsSeen.get(queryId).get(str), kafkaMetric, queryId, str);
        }
    }

    public void close() {
    }

    public Set<String> reconfigurableConfigs() {
        return null;
    }

    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
    }

    public void reconfigure(Map<String, ?> map) {
    }

    public void contextChange(MetricsContext metricsContext) {
    }

    private synchronized void handleNewSstFilesSizeMetric(KafkaMetric kafkaMetric, String str, String str2) {
        TaskStorageMetric taskStorageMetric;
        Map<String, String> queryMetricTags = getQueryMetricTags(str2);
        Map<String, String> taskMetricTags = getTaskMetricTags(queryMetricTags, str);
        LOGGER.debug("Updating disk usage metrics");
        if (!this.metricsSeen.containsKey(str2)) {
            this.metricRegistry.addMetric(this.metricRegistry.metricName("query_storage_used_bytes", METRIC_GROUP, queryMetricTags), (metricConfig, j) -> {
                return computeQueryMetric(str2);
            });
            this.metricsSeen.put(str2, new HashMap());
        }
        if (this.metricsSeen.get(str2).containsKey(str)) {
            taskStorageMetric = this.metricsSeen.get(str2).get(str);
        } else {
            numberStatefulTasks.getAndIncrement();
            taskStorageMetric = new TaskStorageMetric(this.metricRegistry.metricName(TASK_STORAGE_USED_BYTES, METRIC_GROUP, taskMetricTags));
            this.metricsSeen.get(str2).put(str, taskStorageMetric);
            this.metricRegistry.addMetric(taskStorageMetric.metricName, (metricConfig2, j2) -> {
                return taskStorageMetric.getValue();
            });
        }
        taskStorageMetric.add(kafkaMetric);
    }

    private synchronized void handleRemovedSstFileSizeMetric(TaskStorageMetric taskStorageMetric, KafkaMetric kafkaMetric, String str, String str2) {
        taskStorageMetric.remove(kafkaMetric);
        numberStatefulTasks.getAndDecrement();
        if (taskStorageMetric.metrics.size() == 0) {
            this.metricRegistry.removeMetric(taskStorageMetric.metricName);
            this.metricsSeen.get(str).remove(str2);
            if (this.metricsSeen.get(str).size() == 0) {
                this.metricRegistry.removeMetric(this.metricRegistry.metricName("query_storage_used_bytes", METRIC_GROUP, getQueryMetricTags(str)));
            }
        }
    }

    private BigInteger computeQueryMetric(String str) {
        BigInteger bigInteger = BigInteger.ZERO;
        Iterator<Supplier<BigInteger>> it = getGaugesForQuery(str).iterator();
        while (it.hasNext()) {
            bigInteger = bigInteger.add(it.next().get());
        }
        return bigInteger;
    }

    public static synchronized BigInteger getMaxTaskUsage(Metrics metrics) {
        return (BigInteger) ((Map) metrics.metrics().entrySet().stream().filter(entry -> {
            return ((MetricName) entry.getKey()).name().contains(TASK_STORAGE_USED_BYTES);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).values().stream().map(kafkaMetric -> {
            return (BigInteger) kafkaMetric.metricValue();
        }).reduce((v0, v1) -> {
            return v0.max(v1);
        }).orElse(BigInteger.ZERO);
    }

    private synchronized Collection<Supplier<BigInteger>> getGaugesForQuery(String str) {
        return (Collection) this.metricsSeen.get(str).values().stream().map(taskStorageMetric -> {
            taskStorageMetric.getClass();
            return taskStorageMetric::getValue;
        }).collect(Collectors.toList());
    }

    private String getQueryId(KafkaMetric kafkaMetric) {
        Matcher matcher = MetricsTagUtils.SHARED_RUNTIME_THREAD_PATTERN.matcher((String) kafkaMetric.metricName().tags().getOrDefault(MetricsTagUtils.KSQL_TASK_ID_TAG, ""));
        if (matcher.find()) {
            return matcher.group(1);
        }
        Matcher matcher2 = MetricsTagUtils.UNSHARED_RUNTIME_THREAD_PATTERN.matcher((String) kafkaMetric.metricName().tags().getOrDefault("thread-id", ""));
        if (matcher2.find()) {
            return matcher2.group(1);
        }
        LOGGER.error("Can't parse query id from metric {}", kafkaMetric);
        throw new KsqlException("Missing query ID when reporting utilization metrics");
    }

    private Map<String, String> getQueryMetricTags(String str) {
        HashMap hashMap = new HashMap(customTags);
        hashMap.put(MetricsTagUtils.KSQL_QUERY_ID_TAG, str);
        return ImmutableMap.copyOf(hashMap);
    }

    private Map<String, String> getTaskMetricTags(Map<String, String> map, String str) {
        HashMap hashMap = new HashMap(map);
        hashMap.put(MetricsTagUtils.KSQL_TASK_ID_TAG, str);
        return ImmutableMap.copyOf(hashMap);
    }

    @VisibleForTesting
    static void setTags(Map<String, String> map) {
        customTags = map;
    }
}
