/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.metrics;

import io.confluent.connect.replicator.ReplicatorSourceTaskConfig;
import io.confluent.connect.replicator.metrics.ConfluentReplicatorMetrics;
import io.confluent.connect.replicator.metrics.ConfluentReplicatorMetricsRegistry;
import io.confluent.connect.replicator.metrics.FetchEndOffsetService;
import io.confluent.connect.replicator.metrics.MetricLagAvg;
import io.confluent.connect.replicator.util.Utils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfluentReplicatorTaskMetricsGroup {
    private static final Logger log = LoggerFactory.getLogger(ConfluentReplicatorTaskMetricsGroup.class);
    private ReplicatorSourceTaskConfig config;
    private String taskId;
    private Collection<TopicPartition> sourceAssignment;
    private ConfluentReplicatorMetrics replicatorMetrics;
    private Consumer<byte[], byte[]> endOffsetConsumer;
    private Map<TopicPartition, ConfluentReplicatorMetrics.ReplicatorMetricGroup> topicPartitionReplicatorGroupMap;
    private String sourceClusterId;
    private String destClusterId;
    private String replicatorName;
    private FetchEndOffsetService endOffsetService;

    public ConfluentReplicatorTaskMetricsGroup(ReplicatorSourceTaskConfig config, String taskId, Collection<TopicPartition> sourceAssignment, ConfluentReplicatorMetrics replicatorMetrics, String sourceClusterId, String destClusterId, String replicatorName) {
        this(config, taskId, sourceAssignment, replicatorMetrics, sourceClusterId, destClusterId, null, null, replicatorName);
    }

    public ConfluentReplicatorTaskMetricsGroup(ReplicatorSourceTaskConfig config, String taskId, Collection<TopicPartition> sourceAssignment, ConfluentReplicatorMetrics replicatorMetrics, String sourceClusterId, String destClusterId, Consumer<byte[], byte[]> consumer, FetchEndOffsetService endOffsetService, String replicatorName) {
        this.config = config;
        this.taskId = taskId;
        this.sourceAssignment = sourceAssignment;
        this.replicatorMetrics = replicatorMetrics;
        this.sourceClusterId = sourceClusterId;
        this.destClusterId = destClusterId;
        this.topicPartitionReplicatorGroupMap = new HashMap<TopicPartition, ConfluentReplicatorMetrics.ReplicatorMetricGroup>(this.sourceAssignment.size());
        this.endOffsetConsumer = consumer;
        this.endOffsetService = endOffsetService;
        this.replicatorName = replicatorName;
    }

    public void setupMetrics() {
        log.debug("Setting up recording groups for each TopicPartition for this task...");
        this.setupMetricGroups();
        if (this.endOffsetConsumer == null) {
            log.debug("Building endOffsetConsumer...");
            this.endOffsetConsumer = this.buildEndOffsetConsumer(this.config);
        }
        if (this.endOffsetService == null) {
            log.debug("Creating EndOffsetService...");
            this.endOffsetService = new FetchEndOffsetService(this.endOffsetConsumer, this.sourceAssignment, this.taskId, 10000L);
        }
    }

    private void setupMetricGroups() {
        for (TopicPartition tp : this.sourceAssignment) {
            ConfluentReplicatorMetrics.ReplicatorMetricGroup metricGroup = this.replicatorMetrics.group("confluent-replicator-task-metrics", "confluent-replicator-task", this.taskId, "confluent-replicator-task-topic-partition", tp.toString(), "confluent-replicator-name", this.replicatorName, "confluent-replicator-topic-name", tp.topic());
            metricGroup.close();
            metricGroup.addImmutableValueMetric(ConfluentReplicatorMetricsRegistry.sourceClusterIdTemplate, this.sourceClusterId);
            metricGroup.addImmutableValueMetric(ConfluentReplicatorMetricsRegistry.destClusterIdTemplate, this.destClusterId);
            metricGroup.addImmutableValueMetric(ConfluentReplicatorMetricsRegistry.destTopic, this.toDestTopic(tp.topic()));
            Sensor taskTopicPartitionMessageLagSensor = metricGroup.sensor("confluent-replicator-task-topic-partition-message-lag");
            MetricName replicatorTaskLag = metricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskMessageLagTemplate);
            taskTopicPartitionMessageLagSensor.add(replicatorTaskLag, (MeasurableStat)new MetricLagAvg());
            Sensor taskTopicPartitionLatencySensor = metricGroup.sensor("confluent-replicator-task-topic-partition-latency");
            MetricName replicatorTaskLatency = metricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskLatencyTemplate);
            taskTopicPartitionLatencySensor.add(replicatorTaskLatency, (MeasurableStat)new Avg());
            Sensor taskTopicPartitionThroughputSensor = metricGroup.sensor("confluent-replicator-task-topic-partition-throughput");
            MetricName replicatorTaskThroughput = metricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskThroughputTemplate);
            taskTopicPartitionThroughputSensor.add(replicatorTaskThroughput, (MeasurableStat)new Rate());
            Sensor taskTopicPartitionByteThroughputSensor = metricGroup.sensor("confluent-replicator-task-topic-partition-byte-throughput");
            MetricName replicatorTaskByteThroughput = metricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskByteThroughputTemplate);
            taskTopicPartitionByteThroughputSensor.add(replicatorTaskByteThroughput, (MeasurableStat)new Rate());
            log.debug("Initializing MetricGroup for TopicPartition {}", (Object)tp);
            this.topicPartitionReplicatorGroupMap.put(tp, metricGroup);
        }
    }

    private String toDestTopic(String sourceTopic) {
        return Utils.renameTopic((String)this.config.getTopicRenameFormat(), (String)sourceTopic);
    }

    private Consumer<byte[], byte[]> buildEndOffsetConsumer(ReplicatorSourceTaskConfig config) {
        HashMap consumerConfig = new HashMap(config.getSourceConsumerConfigs());
        consumerConfig.put("group.id", "confluent-replicator-end-offsets-consumer-group");
        consumerConfig.put("client.id", "confluent-replicator-end-offsets-consumer-client");
        consumerConfig.put("enable.auto.commit", false);
        consumerConfig.put("auto.offset.reset", "none");
        return new KafkaConsumer(consumerConfig, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
    }

    private TopicPartition constructCurrentTopicPartition(SourceRecord record) {
        TopicPartition tp = null;
        if (record.sourcePartition() != null) {
            String topic = (String)record.sourcePartition().get("topic");
            Object partition = record.sourcePartition().get("partition");
            if (topic != null && partition != null) {
                tp = new TopicPartition(topic, ((Integer)partition).intValue());
            }
        }
        return tp;
    }

    private long getCurrentOffset(SourceRecord record) {
        return (Long)record.sourceOffset().get("offset");
    }

    private long calculateLag(SourceRecord record, TopicPartition tp) {
        log.trace("Calculating lag for current record...");
        long currentOffset = this.getCurrentOffset(record);
        long endOffset = this.endOffsetService.getEndOffset(tp);
        return endOffset > currentOffset ? endOffset - currentOffset : 0L;
    }

    private long calculateLatency(SourceRecord record) {
        log.trace("Calculating latency for current record...");
        long now = System.currentTimeMillis();
        long timestamp = record.timestamp();
        return now > timestamp ? now - timestamp : 0L;
    }

    private double calculateByteBasedThroughput(RecordMetadata recordMetadata) {
        int keySize = Math.max(recordMetadata.serializedKeySize(), 0);
        int valueSize = Math.max(recordMetadata.serializedValueSize(), 0);
        return keySize + valueSize;
    }

    public void recordTaskTopicPartitionMetrics(SourceRecord record, RecordMetadata recordMetadata) {
        TopicPartition tp = this.constructCurrentTopicPartition(record);
        if (tp == null) {
            log.warn("Couldn't record metrics for task {}. Can't find TopicPartition associated with the SourceRecord", (Object)this.taskId);
            return;
        }
        ConfluentReplicatorMetrics.ReplicatorMetricGroup group = this.topicPartitionReplicatorGroupMap.get(tp);
        if (group == null) {
            log.warn("Couldn't record metrics for topic partition {} for task {}. Can't find MetricGroup associated with this TopicPartition.", (Object)tp, (Object)this.taskId);
            return;
        }
        log.trace("Recording metrics for task {} with TopicPartition {}...", (Object)this.taskId, (Object)tp.toString());
        String throughputName = group.sensorPrefix + "confluent-replicator-task-topic-partition-throughput";
        log.trace("Recording throughput metric for task {} with TopicPartition {}", (Object)this.taskId, (Object)tp);
        group.recordMetrics(throughputName, 1.0);
        if (recordMetadata != null) {
            String byteThroughputName = group.sensorPrefix + "confluent-replicator-task-topic-partition-byte-throughput";
            double byteBasedThroughput = this.calculateByteBasedThroughput(recordMetadata);
            log.trace("Recording byte based throughput of {} for task {} with TopicPartition {}", new Object[]{byteBasedThroughput, this.taskId, tp});
            group.recordMetrics(byteThroughputName, byteBasedThroughput);
        }
        if (record.sourceOffset() != null) {
            String lagName = group.sensorPrefix + "confluent-replicator-task-topic-partition-message-lag";
            long lag = this.calculateLag(record, tp);
            log.trace("Recording lag of {} messages for task {} with TopicPartition {}", new Object[]{lag, this.taskId, tp});
            group.recordMetrics(lagName, lag);
        }
        if (record.timestamp() != null) {
            String latencyName = group.sensorPrefix + "confluent-replicator-task-topic-partition-latency";
            long latency = this.calculateLatency(record);
            log.trace("Recording latency of {} ms for task {} with TopicPartition {}", new Object[]{latency, this.taskId, tp});
            group.recordMetrics(latencyName, latency);
        }
        log.trace("Successfully recorded metrics for task {} with TopicPartition {}", (Object)this.taskId, (Object)tp.toString());
    }

    public Map<TopicPartition, ConfluentReplicatorMetrics.ReplicatorMetricGroup> getTopicPartitionReplicatorGroupMap() {
        return this.topicPartitionReplicatorGroupMap;
    }

    public void stopMetrics() {
        log.debug("Closing each MetricGroup for this task...");
        this.closeMetricGroups();
        if (this.replicatorMetrics != null) {
            log.debug("Shutting down replicatorMetrics for task {}", (Object)this.taskId);
            this.replicatorMetrics.stop();
        }
        if (this.endOffsetService != null) {
            log.debug("Shutting down endOffsetService for task {}", (Object)this.taskId);
            this.endOffsetService.shutdown();
        }
    }

    private void closeMetricGroups() {
        for (ConfluentReplicatorMetrics.ReplicatorMetricGroup group : this.topicPartitionReplicatorGroupMap.values()) {
            log.debug("Closing MetricGroup {}", (Object)group.groupId.toString());
            group.close();
        }
    }
}

