package io.confluent.ksql.utilization;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.internal.MetricsReporter;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.streams.KafkaStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/utilization/PersistentQuerySaturationMetrics.class */
public class PersistentQuerySaturationMetrics implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PersistentQuerySaturationMetrics.class);
    private static final String QUERY_SATURATION = "node-query-saturation";
    private static final String NODE_QUERY_SATURATION = "max-node-query-saturation";
    private static final String QUERY_THREAD_SATURATION = "query-thread-saturation";
    private static final String STREAMS_TOTAL_BLOCKED_TIME = "blocked-time-ns-total";
    private static final String STREAMS_THREAD_START_TIME = "thread-start-time";
    private static final String STREAMS_THREAD_METRICS_GROUP = "stream-thread-metrics";
    private static final String THREAD_ID = "thread-id";
    private static final String QUERY_ID = "query-id";
    private Map<String, String> customTags;
    private final Map<String, KafkaStreamsSaturation> perKafkaStreamsStats;
    private final KsqlExecutionContext engine;
    private final Supplier<Instant> time;
    private final MetricsReporter reporter;
    private final Duration window;
    private final Duration sampleMargin;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/utilization/PersistentQuerySaturationMetrics$BlockedTimeSample.class */
    public static final class BlockedTimeSample {
        private final Instant timestamp;
        private final double totalBlockedTime;

        private BlockedTimeSample(Instant instant, double d) {
            this.timestamp = instant;
            this.totalBlockedTime = d;
        }

        public String toString() {
            return "BlockedTimeSample{timestamp=" + this.timestamp + ", totalBlockedTime=" + this.totalBlockedTime + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/utilization/PersistentQuerySaturationMetrics$KafkaStreamsSaturation.class */
    public final class KafkaStreamsSaturation {
        private final Set<QueryId> queryIds;
        private final Map<String, ThreadSaturation> perThreadSaturation;
        private final Duration window;
        private final Duration sampleMargin;

        private KafkaStreamsSaturation(Duration duration, Duration duration2) {
            this.queryIds = new HashSet();
            this.perThreadSaturation = new HashMap();
            this.window = (Duration) Objects.requireNonNull(duration, "window");
            this.sampleMargin = (Duration) Objects.requireNonNull(duration2, "sampleMargin");
        }

        private void reportThreadSaturation(Instant instant, double d, String str, MetricsReporter metricsReporter) {
            PersistentQuerySaturationMetrics.LOGGER.info("Reporting thread saturation {} for {}", Double.valueOf(d), str);
            metricsReporter.report(ImmutableList.of(new MetricsReporter.DataPoint(instant, PersistentQuerySaturationMetrics.QUERY_THREAD_SATURATION, Double.valueOf(d), PersistentQuerySaturationMetrics.this.getTags(PersistentQuerySaturationMetrics.THREAD_ID, str))));
        }

        private void reportQuerySaturation(Instant instant, double d, MetricsReporter metricsReporter) {
            for (QueryId queryId : this.queryIds) {
                PersistentQuerySaturationMetrics.LOGGER.info("Reporting query saturation {} for {}", Double.valueOf(d), queryId);
                metricsReporter.report(ImmutableList.of(new MetricsReporter.DataPoint(instant, PersistentQuerySaturationMetrics.QUERY_SATURATION, Double.valueOf(d), PersistentQuerySaturationMetrics.this.getTags("query-id", queryId.toString()))));
            }
        }

        private void updateQueryIds(List<QueryId> list, MetricsReporter metricsReporter) {
            UnmodifiableIterator it = Sets.difference(this.queryIds, new HashSet(list)).iterator();
            while (it.hasNext()) {
                metricsReporter.cleanup(PersistentQuerySaturationMetrics.QUERY_SATURATION, ImmutableMap.of("query-id", ((QueryId) it.next()).toString()));
            }
            this.queryIds.clear();
            this.queryIds.addAll(list);
        }

        private Map<String, Map<String, Metric>> metricsByThread(KafkaStreams kafkaStreams) {
            return (Map) kafkaStreams.metrics().values().stream().filter(metric -> {
                return metric.metricName().group().equals(PersistentQuerySaturationMetrics.STREAMS_THREAD_METRICS_GROUP);
            }).collect(Collectors.groupingBy(metric2 -> {
                return (String) metric2.metricName().tags().get(PersistentQuerySaturationMetrics.THREAD_ID);
            }, Collectors.toMap(metric3 -> {
                return metric3.metricName().name();
            }, metric4 -> {
                return metric4;
            }, (metric5, metric6) -> {
                return metric5;
            })));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Optional<Double> measure(Instant instant, List<QueryId> list, KafkaStreams kafkaStreams, MetricsReporter metricsReporter) {
            updateQueryIds(list, metricsReporter);
            Map<String, Map<String, Metric>> metricsByThread = metricsByThread(kafkaStreams);
            Optional<Double> of = Optional.of(Double.valueOf(0.0d));
            for (Map.Entry<String, Map<String, Metric>> entry : metricsByThread.entrySet()) {
                String key = entry.getKey();
                Map<String, Metric> value = entry.getValue();
                if (value.containsKey(PersistentQuerySaturationMetrics.STREAMS_TOTAL_BLOCKED_TIME) && value.containsKey(PersistentQuerySaturationMetrics.STREAMS_THREAD_START_TIME)) {
                    double doubleValue = ((Double) value.get(PersistentQuerySaturationMetrics.STREAMS_TOTAL_BLOCKED_TIME).metricValue()).doubleValue();
                    long longValue = ((Long) value.get(PersistentQuerySaturationMetrics.STREAMS_THREAD_START_TIME).metricValue()).longValue();
                    ThreadSaturation computeIfAbsent = this.perThreadSaturation.computeIfAbsent(key, str -> {
                        PersistentQuerySaturationMetrics.LOGGER.debug("Adding saturation for new thread: {}", str);
                        return new ThreadSaturation(key, longValue, this.window, this.sampleMargin);
                    });
                    PersistentQuerySaturationMetrics.LOGGER.debug("Record thread {} sample {} {}", new Object[]{key, Double.valueOf(doubleValue), Long.valueOf(longValue)});
                    Optional<Double> measure = computeIfAbsent.measure(instant, new BlockedTimeSample(instant, doubleValue));
                    PersistentQuerySaturationMetrics.LOGGER.debug("Measured value for thread {}: {}", key, measure.map((v0) -> {
                        return v0.toString();
                    }).orElse(""));
                    measure.ifPresent(d -> {
                        reportThreadSaturation(instant, d.doubleValue(), key, metricsReporter);
                    });
                    of = PersistentQuerySaturationMetrics.compareSaturation(of, measure) > 0 ? of : measure;
                } else {
                    PersistentQuerySaturationMetrics.LOGGER.info("Missing required metrics for thread: {}", key);
                }
            }
            of.ifPresent(d2 -> {
                reportQuerySaturation(instant, d2.doubleValue(), metricsReporter);
            });
            UnmodifiableIterator it = Sets.difference(new HashSet(this.perThreadSaturation.keySet()), metricsByThread.keySet()).iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                this.perThreadSaturation.remove(str2);
                metricsReporter.cleanup(PersistentQuerySaturationMetrics.QUERY_THREAD_SATURATION, ImmutableMap.of(PersistentQuerySaturationMetrics.THREAD_ID, str2));
            }
            return of;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cleanup(MetricsReporter metricsReporter) {
            Iterator<String> it = this.perThreadSaturation.keySet().iterator();
            while (it.hasNext()) {
                metricsReporter.cleanup(PersistentQuerySaturationMetrics.QUERY_THREAD_SATURATION, ImmutableMap.of(PersistentQuerySaturationMetrics.THREAD_ID, it.next()));
            }
            Iterator<QueryId> it2 = this.queryIds.iterator();
            while (it2.hasNext()) {
                metricsReporter.cleanup(PersistentQuerySaturationMetrics.QUERY_SATURATION, ImmutableMap.of("query-id", it2.next().toString()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/utilization/PersistentQuerySaturationMetrics$ThreadSaturation.class */
    public static final class ThreadSaturation {
        private final String threadName;
        private final List<BlockedTimeSample> samples;
        private final Instant startTime;
        private final Duration window;
        private final Duration sampleMargin;

        private ThreadSaturation(String str, long j, Duration duration, Duration duration2) {
            this.samples = new LinkedList();
            this.threadName = (String) Objects.requireNonNull(str, "threadName");
            this.startTime = Instant.ofEpochMilli(j);
            this.window = (Duration) Objects.requireNonNull(duration, "window");
            this.sampleMargin = (Duration) Objects.requireNonNull(duration2, "sampleMargin");
        }

        private boolean inRange(Instant instant, Instant instant2, Instant instant3) {
            return instant.isAfter(instant2) && instant.isBefore(instant3);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Optional<Double> measure(Instant instant, BlockedTimeSample blockedTimeSample) {
            Instant minus = instant.minus((TemporalAmount) this.window);
            Instant minus2 = instant.minus((TemporalAmount) this.window.plus(this.sampleMargin));
            Instant minus3 = instant.minus((TemporalAmount) this.window.minus(this.sampleMargin));
            PersistentQuerySaturationMetrics.LOGGER.debug("{}: record and measure with now {},  window {} ({} : {})", new Object[]{this.threadName, instant, minus, minus2, minus3});
            this.samples.add(blockedTimeSample);
            this.samples.removeIf(blockedTimeSample2 -> {
                return blockedTimeSample2.timestamp.isBefore(minus2);
            });
            if (!inRange(this.samples.get(0).timestamp, minus2, minus3) && !this.startTime.isAfter(minus)) {
                return Optional.empty();
            }
            BlockedTimeSample blockedTimeSample3 = this.samples.get(0);
            PersistentQuerySaturationMetrics.LOGGER.debug("{}: start sample {}", this.threadName, blockedTimeSample3);
            double max = Math.max(blockedTimeSample.totalBlockedTime - blockedTimeSample3.totalBlockedTime, 0.0d);
            Instant instant2 = this.samples.get(0).timestamp;
            if (this.startTime.isAfter(minus)) {
                PersistentQuerySaturationMetrics.LOGGER.debug("{}: start time {} is after window start", this.threadName, this.startTime);
                max += Duration.between(minus, this.startTime).toNanos();
                instant2 = minus;
            }
            double nanos = Duration.between(instant2, blockedTimeSample.timestamp).toNanos();
            return Optional.of(Double.valueOf((nanos - Math.min(max, nanos)) / nanos));
        }
    }

    public PersistentQuerySaturationMetrics(KsqlEngine ksqlEngine, MetricsReporter metricsReporter, Duration duration, Duration duration2, Map<String, String> map) {
        this(Instant::now, ksqlEngine, metricsReporter, duration, duration2, map);
    }

    @VisibleForTesting
    PersistentQuerySaturationMetrics(Supplier<Instant> supplier, KsqlExecutionContext ksqlExecutionContext, MetricsReporter metricsReporter, Duration duration, Duration duration2, Map<String, String> map) {
        this.perKafkaStreamsStats = new HashMap();
        this.time = (Supplier) Objects.requireNonNull(supplier, "time");
        this.engine = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "engine");
        this.reporter = (MetricsReporter) Objects.requireNonNull(metricsReporter, "reporter");
        this.window = (Duration) Objects.requireNonNull(duration, "window");
        this.sampleMargin = (Duration) Objects.requireNonNull(duration2, "sampleMargin");
        this.customTags = (Map) Objects.requireNonNull(map, "customTags");
    }

    @Override // java.lang.Runnable
    public void run() {
        Instant instant = this.time.get();
        try {
            List<PersistentQueryMetadata> persistentQueries = this.engine.getPersistentQueries();
            ((Optional) ((Map) persistentQueries.stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getQueryApplicationId();
            }))).entrySet().stream().map(entry -> {
                return measure(instant, (String) entry.getKey(), (List) entry.getValue());
            }).max(PersistentQuerySaturationMetrics::compareSaturation).orElse(Optional.of(Double.valueOf(0.0d)))).ifPresent(d -> {
                report(instant, d.doubleValue());
            });
            UnmodifiableIterator it = Sets.difference(new HashSet(this.perKafkaStreamsStats.keySet()), (Set) persistentQueries.stream().map((v0) -> {
                return v0.getQueryApplicationId();
            }).collect(Collectors.toSet())).iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                this.perKafkaStreamsStats.get(str).cleanup(this.reporter);
                this.perKafkaStreamsStats.remove(str);
            }
        } catch (RuntimeException e) {
            LOGGER.error("Error collecting saturation", e);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int compareSaturation(Optional<Double> optional, Optional<Double> optional2) {
        if (!optional.isPresent()) {
            return 1;
        }
        if (optional2.isPresent()) {
            return Double.compare(optional.get().doubleValue(), optional2.get().doubleValue());
        }
        return -1;
    }

    private Optional<Double> measure(Instant instant, String str, List<PersistentQueryMetadata> list) {
        KafkaStreamsSaturation computeIfAbsent = this.perKafkaStreamsStats.computeIfAbsent(str, str2 -> {
            return new KafkaStreamsSaturation(this.window, this.sampleMargin);
        });
        Optional findFirst = list.stream().filter(persistentQueryMetadata -> {
            return persistentQueryMetadata.getKafkaStreams() != null;
        }).map((v0) -> {
            return v0.getKafkaStreams();
        }).findFirst();
        return !findFirst.isPresent() ? Optional.of(Double.valueOf(0.0d)) : computeIfAbsent.measure(instant, (List) list.stream().map((v0) -> {
            return v0.getQueryId();
        }).collect(Collectors.toList()), (KafkaStreams) findFirst.get(), this.reporter);
    }

    private void report(Instant instant, double d) {
        LOGGER.info("reporting node-level saturation {}", Double.valueOf(d));
        this.reporter.report(ImmutableList.of(new MetricsReporter.DataPoint(instant, NODE_QUERY_SATURATION, Double.valueOf(d), this.customTags)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, String> getTags(String str, String str2) {
        HashMap hashMap = new HashMap(this.customTags);
        hashMap.put(str, str2);
        return hashMap;
    }
}
