package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;

import com.google.cloud.Timestamp;
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.class */
public class PostProcessingMetricsDoFn extends DoFn<DataChangeRecord, DataChangeRecord> implements Serializable {
    private static final long serialVersionUID = -1515578871387565606L;
    private static final Logger LOG = LoggerFactory.getLogger(PostProcessingMetricsDoFn.class);
    private static final long COMMITTED_TO_EMITTED_THRESHOLD_MS = 100000;
    private static final long STREAM_THRESHOLD_MS = 5000;
    private final ChangeStreamMetrics metrics;

    public PostProcessingMetricsDoFn(ChangeStreamMetrics changeStreamMetrics) {
        this.metrics = changeStreamMetrics;
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element DataChangeRecord dataChangeRecord, DoFn.OutputReceiver<DataChangeRecord> outputReceiver) {
        Instant instant = new Instant(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime());
        this.metrics.incDataRecordCounter();
        measureCommitTimestampToEmittedMillis(dataChangeRecord);
        measureStreamMillis(dataChangeRecord);
        outputReceiver.outputWithTimestamp(dataChangeRecord, instant);
    }

    private void measureCommitTimestampToEmittedMillis(DataChangeRecord dataChangeRecord) {
        Duration duration = new Duration(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime(), Timestamp.now().toSqlTimestamp().getTime());
        long millis = duration.getMillis();
        this.metrics.updateDataRecordCommittedToEmitted(duration);
        if (millis > COMMITTED_TO_EMITTED_THRESHOLD_MS) {
            LOG.debug("Data record took {}ms to be emitted: {}", Long.valueOf(millis), dataChangeRecord.getMetadata());
        }
    }

    private void measureStreamMillis(DataChangeRecord dataChangeRecord) {
        ChangeStreamRecordMetadata metadata = dataChangeRecord.getMetadata();
        long millis = new Duration(metadata.getRecordStreamStartedAt().toSqlTimestamp().getTime(), metadata.getRecordStreamEndedAt().toSqlTimestamp().getTime()).getMillis();
        if (millis > STREAM_THRESHOLD_MS) {
            LOG.debug("Data record took {}ms to be streamed: {}", Long.valueOf(millis), metadata);
        }
    }
}
