package io.debezium.connector.spanner.metrics;

import io.debezium.connector.spanner.metrics.event.LatencyMetricEvent;
import io.debezium.connector.spanner.metrics.event.MetricEvent;
import io.debezium.connector.spanner.metrics.latency.LatencyCalculator;
import io.debezium.connector.spanner.processor.SourceRecordUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/metrics/MetricsEventPublisher.class */
public class MetricsEventPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(MetricsEventPublisher.class);
    private final Map<Class<? extends MetricEvent>, Consumer<? extends MetricEvent>> subscribes = new ConcurrentHashMap();

    public <T extends MetricEvent> void publishMetricEvent(T t) {
        Consumer<? extends MetricEvent> consumer = this.subscribes.get(t.getClass());
        if (consumer != null) {
            try {
                consumer.accept(t);
            } catch (Exception e) {
                LOGGER.warn("Failed to process metric event: " + t, e);
            }
        }
    }

    public <T extends MetricEvent> void subscribe(Class<T> cls, Consumer<T> consumer) {
        if (this.subscribes.containsKey(cls)) {
            throw new IllegalStateException();
        }
        this.subscribes.put(cls, consumer);
    }

    public void logLatency(SourceRecord sourceRecord) {
        if (SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
            Long totalLatency = LatencyCalculator.getTotalLatency(sourceRecord);
            if (totalLatency != null && totalLatency.longValue() > 300000) {
                LOGGER.debug("Published very high total latency for source record {}:{}", sourceRecord, totalLatency);
            }
            Long readToEmitLatency = LatencyCalculator.getReadToEmitLatency(sourceRecord);
            if (readToEmitLatency != null && readToEmitLatency.longValue() > 300000) {
                LOGGER.debug("Published very high readToEmit latency for source record {}:{}", sourceRecord, readToEmitLatency);
            }
            Long spannerLatency = LatencyCalculator.getSpannerLatency(sourceRecord);
            if (spannerLatency != null && spannerLatency.longValue() > 300000) {
                LOGGER.debug("Published very high spannerLatnency latency for source record {}:{}", sourceRecord, spannerLatency);
            }
            Long commitToEmitLatency = LatencyCalculator.getCommitToEmitLatency(sourceRecord);
            if (commitToEmitLatency != null && commitToEmitLatency.longValue() > 300000) {
                LOGGER.debug("Published very high spannerLatnency latency for source record {}:{}", sourceRecord, commitToEmitLatency);
            }
            Long commitToPublishLatency = LatencyCalculator.getCommitToPublishLatency(sourceRecord);
            if (commitToPublishLatency != null && commitToPublishLatency.longValue() > 300000) {
                LOGGER.debug("Published very high commitToPublishLatency latency for source record {}:{}", sourceRecord, commitToPublishLatency);
            }
            Long emitToPublishLatency = LatencyCalculator.getEmitToPublishLatency(sourceRecord);
            if (emitToPublishLatency != null && emitToPublishLatency.longValue() > 300000) {
                LOGGER.debug("Published very high emitToPublishLatency latency for source record {}:{}", sourceRecord, emitToPublishLatency);
            }
            Long ownConnectorLatency = LatencyCalculator.getOwnConnectorLatency(sourceRecord);
            if (ownConnectorLatency != null && ownConnectorLatency.longValue() > 300000) {
                LOGGER.debug("Published very high ownConnectorLatency latency for source record {}:{}", sourceRecord, ownConnectorLatency);
            }
            Long lowWatermarkLag = LatencyCalculator.getLowWatermarkLag(sourceRecord);
            if (lowWatermarkLag != null && lowWatermarkLag.longValue() > 300000) {
                LOGGER.debug("Published very high lowWatermarkLag latency for source record {}:{}", sourceRecord, lowWatermarkLag);
            }
            publishMetricEvent(new LatencyMetricEvent(totalLatency, readToEmitLatency, spannerLatency, commitToEmitLatency, commitToPublishLatency, emitToPublishLatency, lowWatermarkLag, ownConnectorLatency));
        }
    }
}
