package org.apache.flink.runtime.checkpoint;

import java.io.StringWriter;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.JobInitializationMetrics;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.class */
public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultCheckpointStatsTracker.class);
    private static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper();
    private final ReentrantLock statsReadWriteLock;
    private final CheckpointStatsCounts counts;
    private final CompletedCheckpointStatsSummary summary;
    private final CheckpointStatsHistory history;
    private final JobManagerJobMetricGroup metricGroup;
    private Optional<JobInitializationMetricsBuilder> jobInitializationMetricsBuilder;

    @Nullable
    private final CheckpointStatsListener checkpointStatsListener;
    private volatile CheckpointStatsSnapshot latestSnapshot;
    private volatile boolean dirty;

    @Nullable
    private volatile CompletedCheckpointStats latestCompletedCheckpoint;

    @VisibleForTesting
    static final String NUMBER_OF_CHECKPOINTS_METRIC = "totalNumberOfCheckpoints";

    @VisibleForTesting
    static final String NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC = "numberOfInProgressCheckpoints";

    @VisibleForTesting
    static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC = "numberOfCompletedCheckpoints";

    @VisibleForTesting
    static final String NUMBER_OF_FAILED_CHECKPOINTS_METRIC = "numberOfFailedCheckpoints";

    @VisibleForTesting
    static final String LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC = "lastCheckpointRestoreTimestamp";

    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC = "lastCheckpointSize";

    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC = "lastCheckpointFullSize";

    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = "lastCheckpointDuration";

    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC = "lastCheckpointProcessedData";

    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC = "lastCheckpointPersistedData";

    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC = "lastCheckpointExternalPath";

    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_ID_METRIC = "lastCompletedCheckpointId";

    @VisibleForTesting
    static final String LATEST_CHECKPOINT_COMPLETED_TIMESTAMP = "lastCheckpointCompletedTimestamp";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$CheckpointsCounter.class */
    public class CheckpointsCounter implements Gauge<Long> {
        private CheckpointsCounter() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m55getValue() {
            return Long.valueOf(DefaultCheckpointStatsTracker.this.counts.getTotalNumberOfCheckpoints());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$CompletedCheckpointsCounter.class */
    public class CompletedCheckpointsCounter implements Gauge<Long> {
        private CompletedCheckpointsCounter() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m56getValue() {
            return Long.valueOf(DefaultCheckpointStatsTracker.this.counts.getNumberOfCompletedCheckpoints());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$FailedCheckpointsCounter.class */
    public class FailedCheckpointsCounter implements Gauge<Long> {
        private FailedCheckpointsCounter() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m57getValue() {
            return Long.valueOf(DefaultCheckpointStatsTracker.this.counts.getNumberOfFailedCheckpoints());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$InProgressCheckpointsCounter.class */
    public class InProgressCheckpointsCounter implements Gauge<Integer> {
        private InProgressCheckpointsCounter() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Integer m58getValue() {
            return Integer.valueOf(DefaultCheckpointStatsTracker.this.counts.getNumberOfInProgressCheckpoints());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCheckpointCompletedTimestampGauge.class */
    public class LatestCheckpointCompletedTimestampGauge implements Gauge<Long> {
        private LatestCheckpointCompletedTimestampGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m59getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completedCheckpointStats != null) {
                return Long.valueOf(completedCheckpointStats.getLatestAckTimestamp());
            }
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCompletedCheckpointDurationGauge.class */
    public class LatestCompletedCheckpointDurationGauge implements Gauge<Long> {
        private LatestCompletedCheckpointDurationGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m60getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completedCheckpointStats != null) {
                return Long.valueOf(completedCheckpointStats.getEndToEndDuration());
            }
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge.class */
    public class LatestCompletedCheckpointExternalPathGauge implements Gauge<String> {
        private LatestCompletedCheckpointExternalPathGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m61getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            return (completedCheckpointStats == null || completedCheckpointStats.getExternalPath() == null) ? "n/a" : completedCheckpointStats.getExternalPath();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCompletedCheckpointFullSizeGauge.class */
    public class LatestCompletedCheckpointFullSizeGauge implements Gauge<Long> {
        private LatestCompletedCheckpointFullSizeGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m62getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completedCheckpointStats != null) {
                return Long.valueOf(completedCheckpointStats.getStateSize());
            }
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCompletedCheckpointIdGauge.class */
    public class LatestCompletedCheckpointIdGauge implements Gauge<Long> {
        private LatestCompletedCheckpointIdGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m63getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completedCheckpointStats != null) {
                return Long.valueOf(completedCheckpointStats.getCheckpointId());
            }
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCompletedCheckpointPersistedDataGauge.class */
    public class LatestCompletedCheckpointPersistedDataGauge implements Gauge<Long> {
        private LatestCompletedCheckpointPersistedDataGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m64getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completedCheckpointStats != null) {
                return Long.valueOf(completedCheckpointStats.getPersistedData());
            }
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCompletedCheckpointProcessedDataGauge.class */
    public class LatestCompletedCheckpointProcessedDataGauge implements Gauge<Long> {
        private LatestCompletedCheckpointProcessedDataGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m65getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completedCheckpointStats != null) {
                return Long.valueOf(completedCheckpointStats.getProcessedData());
            }
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestCompletedCheckpointSizeGauge.class */
    public class LatestCompletedCheckpointSizeGauge implements Gauge<Long> {
        private LatestCompletedCheckpointSizeGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m66getValue() {
            CompletedCheckpointStats completedCheckpointStats = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completedCheckpointStats != null) {
                return Long.valueOf(completedCheckpointStats.getCheckpointedSize());
            }
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker$LatestRestoredCheckpointTimestampGauge.class */
    public class LatestRestoredCheckpointTimestampGauge implements Gauge<Long> {
        private LatestRestoredCheckpointTimestampGauge() {
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m67getValue() {
            return (Long) DefaultCheckpointStatsTracker.this.jobInitializationMetricsBuilder.map((v0) -> {
                return v0.getStartTs();
            }).orElse(-1L);
        }
    }

    public DefaultCheckpointStatsTracker(int i, JobManagerJobMetricGroup jobManagerJobMetricGroup) {
        this(i, jobManagerJobMetricGroup, null);
    }

    public DefaultCheckpointStatsTracker(int i, JobManagerJobMetricGroup jobManagerJobMetricGroup, @Nullable CheckpointStatsListener checkpointStatsListener) {
        this.statsReadWriteLock = new ReentrantLock();
        this.counts = new CheckpointStatsCounts();
        this.summary = new CompletedCheckpointStatsSummary();
        this.jobInitializationMetricsBuilder = Optional.empty();
        Preconditions.checkArgument(i >= 0, "Negative number of remembered checkpoints");
        this.history = new CheckpointStatsHistory(i);
        this.metricGroup = jobManagerJobMetricGroup;
        this.checkpointStatsListener = checkpointStatsListener;
        this.latestSnapshot = new CheckpointStatsSnapshot(this.counts.createSnapshot(), this.summary.createSnapshot(), this.history.createSnapshot(), null);
        registerMetrics(jobManagerJobMetricGroup);
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public CheckpointStatsSnapshot createSnapshot() {
        CheckpointStatsSnapshot checkpointStatsSnapshot = this.latestSnapshot;
        if (this.dirty && this.statsReadWriteLock.tryLock()) {
            try {
                checkpointStatsSnapshot = new CheckpointStatsSnapshot(this.counts.createSnapshot(), this.summary.createSnapshot(), this.history.createSnapshot(), (RestoredCheckpointStats) this.jobInitializationMetricsBuilder.flatMap((v0) -> {
                    return v0.buildRestoredCheckpointStats();
                }).orElse(null));
                this.latestSnapshot = checkpointStatsSnapshot;
                this.dirty = false;
            } finally {
                this.statsReadWriteLock.unlock();
            }
        }
        return checkpointStatsSnapshot;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public PendingCheckpointStats reportPendingCheckpoint(long j, long j2, CheckpointProperties checkpointProperties, Map<JobVertexID, Integer> map) {
        PendingCheckpointStats pendingCheckpointStats = new PendingCheckpointStats(j, j2, checkpointProperties, map);
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementInProgressCheckpoints();
            this.history.addInProgressCheckpoint(pendingCheckpointStats);
            this.dirty = true;
            this.statsReadWriteLock.unlock();
            return pendingCheckpointStats;
        } catch (Throwable th) {
            this.statsReadWriteLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public void reportRestoredCheckpoint(long j, CheckpointProperties checkpointProperties, String str, long j2) {
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementRestoredCheckpoints();
            Preconditions.checkState(this.jobInitializationMetricsBuilder.isPresent(), "JobInitializationMetrics should have been set first, before RestoredCheckpointStats");
            this.jobInitializationMetricsBuilder.get().setRestoredCheckpointStats(j, j2, checkpointProperties, str);
            this.dirty = true;
            this.statsReadWriteLock.unlock();
        } catch (Throwable th) {
            this.statsReadWriteLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public void reportCompletedCheckpoint(CompletedCheckpointStats completedCheckpointStats) {
        this.statsReadWriteLock.lock();
        try {
            this.latestCompletedCheckpoint = completedCheckpointStats;
            this.counts.incrementCompletedCheckpoints();
            this.history.replacePendingCheckpointById(completedCheckpointStats);
            this.summary.updateSummary(completedCheckpointStats);
            this.dirty = true;
            logCheckpointStatistics(completedCheckpointStats);
            if (this.checkpointStatsListener != null) {
                this.checkpointStatsListener.onCompletedCheckpoint();
            }
        } finally {
            this.statsReadWriteLock.unlock();
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public void reportFailedCheckpoint(FailedCheckpointStats failedCheckpointStats) {
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementFailedCheckpoints();
            this.history.replacePendingCheckpointById(failedCheckpointStats);
            this.dirty = true;
            logCheckpointStatistics(failedCheckpointStats);
            if (this.checkpointStatsListener != null) {
                this.checkpointStatsListener.onFailedCheckpoint();
            }
        } finally {
            this.statsReadWriteLock.unlock();
        }
    }

    private void logCheckpointStatistics(AbstractCheckpointStats abstractCheckpointStats) {
        try {
            this.metricGroup.addSpan(Span.builder(CheckpointStatsTracker.class, "Checkpoint").setStartTsMillis(abstractCheckpointStats.getTriggerTimestamp()).setEndTsMillis(abstractCheckpointStats.getLatestAckTimestamp()).setAttribute("checkpointId", abstractCheckpointStats.getCheckpointId()).setAttribute("fullSize", abstractCheckpointStats.getStateSize()).setAttribute("checkpointedSize", abstractCheckpointStats.getCheckpointedSize()).setAttribute("checkpointStatus", abstractCheckpointStats.getStatus().name()).setAttribute("isUnaligned", Boolean.toString(abstractCheckpointStats.isUnalignedCheckpoint())).setAttribute("checkpointType", abstractCheckpointStats.getProperties().getCheckpointType().getName()));
            if (LOG.isDebugEnabled()) {
                StringWriter stringWriter = new StringWriter();
                MAPPER.writeValue(stringWriter, CheckpointStatistics.generateCheckpointStatistics(abstractCheckpointStats, true));
                LOG.debug("CheckpointStatistics (for jobID={}, checkpointId={}) dump = {} ", new Object[]{this.metricGroup.jobId(), Long.valueOf(abstractCheckpointStats.checkpointId), stringWriter.toString()});
            }
        } catch (Exception e) {
            LOG.warn("Fail to log CheckpointStatistics", e);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public void reportFailedCheckpointsWithoutInProgress() {
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementFailedCheckpointsWithoutInProgress();
            this.dirty = true;
            if (this.checkpointStatsListener != null) {
                this.checkpointStatsListener.onFailedCheckpoint();
            }
        } finally {
            this.statsReadWriteLock.unlock();
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public PendingCheckpointStats getPendingCheckpointStats(long j) {
        this.statsReadWriteLock.lock();
        try {
            AbstractCheckpointStats checkpointById = this.history.getCheckpointById(j);
            return checkpointById instanceof PendingCheckpointStats ? (PendingCheckpointStats) checkpointById : null;
        } finally {
            this.statsReadWriteLock.unlock();
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public void reportIncompleteStats(long j, ExecutionAttemptID executionAttemptID, CheckpointMetrics checkpointMetrics) {
        this.statsReadWriteLock.lock();
        try {
            AbstractCheckpointStats checkpointById = this.history.getCheckpointById(j);
            if (checkpointById instanceof PendingCheckpointStats) {
                ((PendingCheckpointStats) checkpointById).reportSubtaskStats(executionAttemptID.getJobVertexId(), new SubtaskStateStats(executionAttemptID.getSubtaskIndex(), System.currentTimeMillis(), checkpointMetrics.getBytesPersistedOfThisCheckpoint(), checkpointMetrics.getTotalBytesPersisted(), checkpointMetrics.getSyncDurationMillis(), checkpointMetrics.getAsyncDurationMillis(), checkpointMetrics.getBytesProcessedDuringAlignment(), checkpointMetrics.getBytesPersistedDuringAlignment(), checkpointMetrics.getAlignmentDurationNanos() / 1000000, checkpointMetrics.getCheckpointStartDelayNanos() / 1000000, checkpointMetrics.getUnalignedCheckpoint(), false));
                this.dirty = true;
            }
        } finally {
            this.statsReadWriteLock.unlock();
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public void reportInitializationStarted(Set<ExecutionAttemptID> set, long j) {
        this.jobInitializationMetricsBuilder = Optional.of(new JobInitializationMetricsBuilder(set, j));
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
    public void reportInitializationMetrics(ExecutionAttemptID executionAttemptID, SubTaskInitializationMetrics subTaskInitializationMetrics) {
        this.statsReadWriteLock.lock();
        try {
            try {
                if (!this.jobInitializationMetricsBuilder.isPresent()) {
                    LOG.warn("Attempted to report SubTaskInitializationMetrics [{}] without jobInitializationMetricsBuilder present", subTaskInitializationMetrics);
                    this.statsReadWriteLock.unlock();
                    return;
                }
                JobInitializationMetricsBuilder jobInitializationMetricsBuilder = this.jobInitializationMetricsBuilder.get();
                jobInitializationMetricsBuilder.reportInitializationMetrics(executionAttemptID, subTaskInitializationMetrics);
                if (jobInitializationMetricsBuilder.isComplete()) {
                    traceInitializationMetrics(jobInitializationMetricsBuilder.build());
                }
                this.statsReadWriteLock.unlock();
            } catch (Exception e) {
                LOG.warn("Failed to log SubTaskInitializationMetrics [{}]", subTaskInitializationMetrics, e);
                this.statsReadWriteLock.unlock();
            }
        } catch (Throwable th) {
            this.statsReadWriteLock.unlock();
            throw th;
        }
    }

    private void traceInitializationMetrics(JobInitializationMetrics jobInitializationMetrics) {
        SpanBuilder attribute = Span.builder(CheckpointStatsTracker.class, "JobInitialization").setStartTsMillis(jobInitializationMetrics.getStartTs()).setEndTsMillis(jobInitializationMetrics.getEndTs()).setAttribute("initializationStatus", jobInitializationMetrics.getStatus().name());
        Iterator<JobInitializationMetrics.SumMaxDuration> it = jobInitializationMetrics.getDurationMetrics().values().iterator();
        while (it.hasNext()) {
            setDurationSpanAttribute(attribute, it.next());
        }
        if (jobInitializationMetrics.getCheckpointId() != -1) {
            attribute.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
        }
        if (jobInitializationMetrics.getStateSize() != -1) {
            attribute.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
        }
        this.metricGroup.addSpan(attribute);
    }

    private void setDurationSpanAttribute(SpanBuilder spanBuilder, JobInitializationMetrics.SumMaxDuration sumMaxDuration) {
        spanBuilder.setAttribute("max" + sumMaxDuration.getName(), sumMaxDuration.getMax());
        spanBuilder.setAttribute("sum" + sumMaxDuration.getName(), sumMaxDuration.getSum());
    }

    private void registerMetrics(MetricGroup metricGroup) {
        metricGroup.gauge(NUMBER_OF_CHECKPOINTS_METRIC, new CheckpointsCounter());
        metricGroup.gauge(NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC, new InProgressCheckpointsCounter());
        metricGroup.gauge(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC, new CompletedCheckpointsCounter());
        metricGroup.gauge(NUMBER_OF_FAILED_CHECKPOINTS_METRIC, new FailedCheckpointsCounter());
        metricGroup.gauge(LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC, new LatestRestoredCheckpointTimestampGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, new LatestCompletedCheckpointSizeGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC, new LatestCompletedCheckpointFullSizeGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC, new LatestCompletedCheckpointDurationGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC, new LatestCompletedCheckpointProcessedDataGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC, new LatestCompletedCheckpointPersistedDataGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC, new LatestCompletedCheckpointExternalPathGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_ID_METRIC, new LatestCompletedCheckpointIdGauge());
        metricGroup.gauge(LATEST_CHECKPOINT_COMPLETED_TIMESTAMP, new LatestCheckpointCompletedTimestampGauge());
    }
}
