package io.confluent.ksql.metrics;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.common.utils.Time;
import io.confluent.ksql.metrics.TopicSensors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;

/* loaded from: input_file:io/confluent/ksql/metrics/StreamsErrorCollector.class */
public final class StreamsErrorCollector implements MetricCollector {
    public static final String CONSUMER_FAILED_MESSAGES = "consumer-failed-messages";
    public static final String CONSUMER_FAILED_MESSAGES_PER_SEC = "consumer-failed-messages-per-sec";
    private final MetricCollectors metricCollectors;
    private final Metrics metrics;
    private final Map<String, TopicSensors<Object>> topicSensors = Maps.newConcurrentMap();
    private final Time time;
    private String id;

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}, justification = "metrics")
    public static StreamsErrorCollector create(String str, MetricCollectors metricCollectors) {
        StreamsErrorCollector streamsErrorCollector = new StreamsErrorCollector(metricCollectors);
        streamsErrorCollector.id = metricCollectors.addCollector(str, streamsErrorCollector);
        return streamsErrorCollector;
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}, justification = "metrics")
    private StreamsErrorCollector(MetricCollectors metricCollectors) {
        this.metricCollectors = metricCollectors;
        this.metrics = metricCollectors.getMetrics();
        this.time = metricCollectors.getTime();
    }

    private TopicSensors<Object> buildSensors(String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(buildSensor(str, CONSUMER_FAILED_MESSAGES, new CumulativeSum(), obj -> {
            return Double.valueOf(1.0d);
        }));
        arrayList.add(buildSensor(str, CONSUMER_FAILED_MESSAGES_PER_SEC, new Rate(), obj2 -> {
            return Double.valueOf(1.0d);
        }));
        return new TopicSensors<>(str, arrayList);
    }

    private TopicSensors.SensorMetric<Object> buildSensor(String str, String str2, MeasurableStat measurableStat, final Function<Object, Double> function) {
        String str3 = "sec-" + str + "-" + str2 + "-" + this.id;
        MetricName metricName = new MetricName(str2, ConsumerCollector.CONSUMER_COLLECTOR_METRICS_GROUP_NAME, "consumer-" + str3, ImmutableMap.of("key", str, "id", this.id));
        final Sensor sensor = this.metrics.sensor(str3);
        sensor.add(metricName, measurableStat);
        return new TopicSensors.SensorMetric<Object>(sensor, (KafkaMetric) this.metrics.metrics().get(metricName), this.time, true) { // from class: io.confluent.ksql.metrics.StreamsErrorCollector.1
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // io.confluent.ksql.metrics.TopicSensors.SensorMetric
            public void record(Object obj) {
                sensor.record(((Double) function.apply(obj)).doubleValue());
                super.record(obj);
            }
        };
    }

    public void cleanup() {
        this.metricCollectors.remove(this.id);
        this.topicSensors.values().forEach(topicSensors -> {
            topicSensors.close(this.metrics);
        });
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public double errorRate() {
        ArrayList arrayList = new ArrayList();
        this.topicSensors.values().forEach(topicSensors -> {
            arrayList.addAll(topicSensors.errorRateStats());
        });
        return arrayList.stream().mapToDouble((v0) -> {
            return v0.getValue();
        }).sum();
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public double aggregateStat(String str, boolean z) {
        return MetricUtils.aggregateStat(str, z, this.topicSensors.values());
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public Collection<TopicSensors.Stat> stats(String str, boolean z) {
        return MetricUtils.stats(str, z, this.topicSensors.values());
    }

    public void recordError(String str) {
        this.topicSensors.computeIfAbsent(str, this::buildSensors).increment(null, true);
    }
}
