package org.apache.flink.connector.kafka.source.metrics;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.kafka.MetricUtil;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.class */
public class KafkaSourceReaderMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReaderMetrics.class);
    public static final String KAFKA_SOURCE_READER_METRIC_GROUP = "KafkaSourceReader";
    public static final String TOPIC_GROUP = "topic";
    public static final String PARTITION_GROUP = "partition";
    public static final String CURRENT_OFFSET_METRIC_GAUGE = "currentOffset";
    public static final String COMMITTED_OFFSET_METRIC_GAUGE = "committedOffset";
    public static final String COMMITS_SUCCEEDED_METRIC_COUNTER = "commitsSucceeded";
    public static final String COMMITS_FAILED_METRIC_COUNTER = "commitsFailed";
    public static final String KAFKA_CONSUMER_METRIC_GROUP = "KafkaConsumer";
    public static final String CONSUMER_FETCH_MANAGER_GROUP = "consumer-fetch-manager-metrics";
    public static final String BYTES_CONSUMED_TOTAL = "bytes-consumed-total";
    public static final String RECORDS_LAG = "records-lag";
    public static final long INITIAL_OFFSET = -1;
    private final SourceReaderMetricGroup sourceReaderMetricGroup;
    private final MetricGroup kafkaSourceReaderMetricGroup;
    private final Counter commitsSucceeded;
    private final Counter commitsFailed;
    private final Map<TopicPartition, Offset> offsets = new HashMap();

    @Nullable
    private ConcurrentMap<TopicPartition, Metric> recordsLagMetrics;

    @Nullable
    private Metric bytesConsumedTotalMetric;
    private long latestBytesConsumedTotal;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics$Offset.class */
    public static class Offset {
        long currentOffset;
        long committedOffset;

        Offset(long j, long j2) {
            this.currentOffset = j;
            this.committedOffset = j2;
        }
    }

    public KafkaSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
        this.sourceReaderMetricGroup = sourceReaderMetricGroup;
        this.kafkaSourceReaderMetricGroup = sourceReaderMetricGroup.addGroup(KAFKA_SOURCE_READER_METRIC_GROUP);
        this.commitsSucceeded = this.kafkaSourceReaderMetricGroup.counter(COMMITS_SUCCEEDED_METRIC_COUNTER);
        this.commitsFailed = this.kafkaSourceReaderMetricGroup.counter(COMMITS_FAILED_METRIC_COUNTER);
    }

    public void registerKafkaConsumerMetrics(KafkaConsumer<?, ?> kafkaConsumer) {
        Map metrics = kafkaConsumer.metrics();
        if (metrics == null) {
            LOG.warn("Consumer implementation does not support metrics");
        } else {
            MetricGroup addGroup = this.kafkaSourceReaderMetricGroup.addGroup(KAFKA_CONSUMER_METRIC_GROUP);
            metrics.forEach((metricName, metric) -> {
                addGroup.gauge(metricName.name(), () -> {
                    return metric.metricValue();
                });
            });
        }
    }

    public void registerTopicPartition(TopicPartition topicPartition) {
        this.offsets.put(topicPartition, new Offset(-1L, -1L));
        registerOffsetMetricsForTopicPartition(topicPartition);
    }

    public void recordCurrentOffset(TopicPartition topicPartition, long j) {
        checkTopicPartitionTracked(topicPartition);
        this.offsets.get(topicPartition).currentOffset = j;
    }

    public void recordCommittedOffset(TopicPartition topicPartition, long j) {
        checkTopicPartitionTracked(topicPartition);
        this.offsets.get(topicPartition).committedOffset = j;
    }

    public void recordSucceededCommit() {
        this.commitsSucceeded.inc();
    }

    public void recordFailedCommit() {
        this.commitsFailed.inc();
    }

    public void registerNumBytesIn(KafkaConsumer<?, ?> kafkaConsumer) {
        try {
            this.bytesConsumedTotalMetric = MetricUtil.getKafkaMetric(kafkaConsumer.metrics(), entry -> {
                return ((MetricName) entry.getKey()).group().equals(CONSUMER_FETCH_MANAGER_GROUP) && ((MetricName) entry.getKey()).name().equals(BYTES_CONSUMED_TOTAL) && !((MetricName) entry.getKey()).tags().containsKey(TOPIC_GROUP);
            });
        } catch (IllegalStateException e) {
            LOG.warn(String.format("Error when getting Kafka consumer metric \"%s\". I/O metric \"%s\" will not be reported. ", BYTES_CONSUMED_TOTAL, "numBytesIn"), e);
        }
    }

    public void maybeAddRecordsLagMetric(KafkaConsumer<?, ?> kafkaConsumer, TopicPartition topicPartition) {
        if (this.recordsLagMetrics == null) {
            this.recordsLagMetrics = new ConcurrentHashMap();
            this.sourceReaderMetricGroup.setPendingRecordsGauge(() -> {
                long j = 0;
                Iterator<Metric> it = this.recordsLagMetrics.values().iterator();
                while (it.hasNext()) {
                    j += ((Double) it.next().metricValue()).longValue();
                }
                return Long.valueOf(j);
            });
        }
        this.recordsLagMetrics.computeIfAbsent(topicPartition, topicPartition2 -> {
            return getRecordsLagMetric(kafkaConsumer.metrics(), topicPartition);
        });
    }

    public void removeRecordsLagMetric(TopicPartition topicPartition) {
        if (this.recordsLagMetrics != null) {
            this.recordsLagMetrics.remove(topicPartition);
        }
    }

    public void updateNumBytesInCounter() {
        if (this.bytesConsumedTotalMetric != null) {
            long longValue = ((Number) this.bytesConsumedTotalMetric.metricValue()).longValue();
            this.sourceReaderMetricGroup.getIOMetricGroup().getNumBytesInCounter().inc(longValue - this.latestBytesConsumedTotal);
            this.latestBytesConsumedTotal = longValue;
        }
    }

    private void registerOffsetMetricsForTopicPartition(TopicPartition topicPartition) {
        MetricGroup addGroup = this.kafkaSourceReaderMetricGroup.addGroup(TOPIC_GROUP, topicPartition.topic()).addGroup(PARTITION_GROUP, String.valueOf(topicPartition.partition()));
        addGroup.gauge(CURRENT_OFFSET_METRIC_GAUGE, () -> {
            return Long.valueOf(this.offsets.getOrDefault(topicPartition, new Offset(-1L, -1L)).currentOffset);
        });
        addGroup.gauge(COMMITTED_OFFSET_METRIC_GAUGE, () -> {
            return Long.valueOf(this.offsets.getOrDefault(topicPartition, new Offset(-1L, -1L)).committedOffset);
        });
    }

    private void checkTopicPartitionTracked(TopicPartition topicPartition) {
        if (!this.offsets.containsKey(topicPartition)) {
            throw new IllegalArgumentException(String.format("TopicPartition %s is not tracked", topicPartition));
        }
    }

    @Nullable
    private Metric getRecordsLagMetric(Map<MetricName, ? extends Metric> map, TopicPartition topicPartition) {
        try {
            String replace = topicPartition.topic().replace('.', '_');
            String valueOf = String.valueOf(topicPartition.partition());
            return MetricUtil.getKafkaMetric(map, entry -> {
                MetricName metricName = (MetricName) entry.getKey();
                Map tags = metricName.tags();
                return metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP) && metricName.name().equals(RECORDS_LAG) && tags.containsKey(TOPIC_GROUP) && ((String) tags.get(TOPIC_GROUP)).equals(replace) && tags.containsKey(PARTITION_GROUP) && ((String) tags.get(PARTITION_GROUP)).equals(valueOf);
            });
        } catch (IllegalStateException e) {
            LOG.warn(String.format("Error when getting Kafka consumer metric \"%s\" for partition \"%s\". Metric \"%s\" may not be reported correctly. ", RECORDS_LAG, topicPartition, "pendingRecords"), e);
            return null;
        }
    }
}
