package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.class */
class DefaultCheckpointStatsTrackerTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    DefaultCheckpointStatsTrackerTest() {
    }

    @Test
    void testTrackerWithoutHistory() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionJobVertex jobVertex = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 256).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getJobVertex(jobVertexID);
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(0, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        PendingCheckpointStats reportPendingCheckpoint = defaultCheckpointStatsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, Integer.valueOf(jobVertex.getParallelism())));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats((String) null));
        CheckpointStatsSnapshot createSnapshot = defaultCheckpointStatsTracker.createSnapshot();
        Assertions.assertThat(createSnapshot.getHistory().getCheckpoints().iterator()).isExhausted();
        CheckpointStatsCounts counts = createSnapshot.getCounts();
        Assertions.assertThat(counts.getNumberOfCompletedCheckpoints()).isOne();
        Assertions.assertThat(counts.getTotalNumberOfCheckpoints()).isOne();
        CompletedCheckpointStatsSummarySnapshot summaryStats = createSnapshot.getSummaryStats();
        Assertions.assertThat(summaryStats.getStateSizeStats().getCount()).isOne();
        Assertions.assertThat(summaryStats.getEndToEndDurationStats().getCount()).isOne();
        Assertions.assertThat(createSnapshot.getHistory().getLatestCompletedCheckpoint()).isNotNull();
        Assertions.assertThat(createSnapshot.getHistory().getLatestCompletedCheckpoint().getCheckpointId()).isZero();
    }

    @Test
    void testCheckpointTracking() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        Map singletonMap = Collections.singletonMap(jobVertexID, Integer.valueOf(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 256).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getJobVertex(jobVertexID).getParallelism()));
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        PendingCheckpointStats reportPendingCheckpoint = defaultCheckpointStatsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), singletonMap);
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats((String) null));
        PendingCheckpointStats reportPendingCheckpoint2 = defaultCheckpointStatsTracker.reportPendingCheckpoint(1L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), singletonMap);
        defaultCheckpointStatsTracker.reportFailedCheckpoint(reportPendingCheckpoint2.toFailedCheckpoint(12L, (Throwable) null));
        PendingCheckpointStats reportPendingCheckpoint3 = defaultCheckpointStatsTracker.reportPendingCheckpoint(2L, 1L, CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL), singletonMap);
        reportPendingCheckpoint3.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        reportPendingCheckpoint3.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
        reportPendingCheckpoint3.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint3.toCompletedCheckpointStats((String) null));
        defaultCheckpointStatsTracker.reportPendingCheckpoint(3L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), singletonMap);
        RestoredCheckpointStats restoredCheckpointStats = new RestoredCheckpointStats(81L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 123L, (String) null, 42L);
        defaultCheckpointStatsTracker.reportInitializationStarted(Collections.emptySet(), 123L);
        reportRestoredCheckpoint(defaultCheckpointStatsTracker, restoredCheckpointStats);
        CheckpointStatsSnapshot createSnapshot = defaultCheckpointStatsTracker.createSnapshot();
        CheckpointStatsCounts counts = createSnapshot.getCounts();
        Assertions.assertThat(counts.getTotalNumberOfCheckpoints()).isEqualTo(4L);
        Assertions.assertThat(counts.getNumberOfInProgressCheckpoints()).isOne();
        Assertions.assertThat(counts.getNumberOfCompletedCheckpoints()).isEqualTo(2L);
        Assertions.assertThat(counts.getNumberOfFailedCheckpoints()).isOne();
        defaultCheckpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
        CheckpointStatsCounts counts2 = defaultCheckpointStatsTracker.createSnapshot().getCounts();
        Assertions.assertThat(counts2.getTotalNumberOfCheckpoints()).isEqualTo(5L);
        Assertions.assertThat(counts2.getNumberOfInProgressCheckpoints()).isOne();
        Assertions.assertThat(counts2.getNumberOfCompletedCheckpoints()).isEqualTo(2L);
        Assertions.assertThat(counts2.getNumberOfFailedCheckpoints()).isEqualTo(2L);
        CompletedCheckpointStatsSummarySnapshot summaryStats = createSnapshot.getSummaryStats();
        Assertions.assertThat(summaryStats.getStateSizeStats().getCount()).isEqualTo(2L);
        Assertions.assertThat(summaryStats.getEndToEndDurationStats().getCount()).isEqualTo(2L);
        Iterator it = createSnapshot.getHistory().getCheckpoints().iterator();
        Assertions.assertThat(it).hasNext();
        AbstractCheckpointStats abstractCheckpointStats = (AbstractCheckpointStats) it.next();
        Assertions.assertThat(abstractCheckpointStats.getCheckpointId()).isEqualTo(3L);
        Assertions.assertThat(abstractCheckpointStats.getStatus().isInProgress()).isTrue();
        Assertions.assertThat(it).hasNext();
        AbstractCheckpointStats abstractCheckpointStats2 = (AbstractCheckpointStats) it.next();
        Assertions.assertThat(abstractCheckpointStats2.getCheckpointId()).isEqualTo(2L);
        Assertions.assertThat(abstractCheckpointStats2.getStatus().isCompleted()).isTrue();
        Assertions.assertThat(it).hasNext();
        AbstractCheckpointStats abstractCheckpointStats3 = (AbstractCheckpointStats) it.next();
        Assertions.assertThat(abstractCheckpointStats3.getCheckpointId()).isOne();
        Assertions.assertThat(abstractCheckpointStats3.getStatus().isFailed()).isTrue();
        Assertions.assertThat(it).hasNext();
        AbstractCheckpointStats abstractCheckpointStats4 = (AbstractCheckpointStats) it.next();
        Assertions.assertThat(abstractCheckpointStats4.getCheckpointId()).isZero();
        Assertions.assertThat(abstractCheckpointStats4.getStatus().isCompleted()).isTrue();
        Assertions.assertThat(it).isExhausted();
        Assertions.assertThat(createSnapshot.getHistory().getLatestCompletedCheckpoint().getCheckpointId()).isEqualTo(reportPendingCheckpoint.getCheckpointId());
        Assertions.assertThat(createSnapshot.getHistory().getLatestSavepoint().getCheckpointId()).isEqualTo(reportPendingCheckpoint3.getCheckpointId());
        Assertions.assertThat(createSnapshot.getHistory().getLatestFailedCheckpoint().getCheckpointId()).isEqualTo(reportPendingCheckpoint2.getCheckpointId());
        Assertions.assertThat(createSnapshot.getLatestRestoredCheckpoint()).isEqualTo(restoredCheckpointStats);
    }

    @Test
    void testCheckpointStatsListenerOnCompletedCheckpoint() {
        testCheckpointStatsListener((checkpointStatsTracker, pendingCheckpointStats) -> {
            checkpointStatsTracker.reportCompletedCheckpoint(pendingCheckpointStats.toCompletedCheckpointStats("random-external-pointer"));
        }, 1, 0);
    }

    @Test
    void testCheckpointStatsListenerOnFailedCheckpoint() {
        testCheckpointStatsListener((checkpointStatsTracker, pendingCheckpointStats) -> {
            checkpointStatsTracker.reportFailedCheckpoint(pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), (Throwable) null));
        }, 0, 1);
    }

    private void testCheckpointStatsListener(BiConsumer<CheckpointStatsTracker, PendingCheckpointStats> biConsumer, int i, int i2) {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), new CheckpointStatsListener() { // from class: org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTrackerTest.1
            public void onCompletedCheckpoint() {
                atomicInteger.incrementAndGet();
            }

            public void onFailedCheckpoint() {
                atomicInteger2.incrementAndGet();
            }
        });
        JobVertexID jobVertexID = new JobVertexID();
        PendingCheckpointStats reportPendingCheckpoint = defaultCheckpointStatsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        biConsumer.accept(defaultCheckpointStatsTracker, reportPendingCheckpoint);
        Assertions.assertThat(atomicInteger).hasValue(i);
        Assertions.assertThat(atomicInteger2).hasValue(i2);
    }

    @Test
    void testCreateSnapshot() {
        JobVertexID jobVertexID = new JobVertexID();
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointStatsSnapshot createSnapshot = defaultCheckpointStatsTracker.createSnapshot();
        PendingCheckpointStats reportPendingCheckpoint = defaultCheckpointStatsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        CheckpointStatsSnapshot createSnapshot2 = defaultCheckpointStatsTracker.createSnapshot();
        Assertions.assertThat(createSnapshot2).isNotEqualTo(createSnapshot);
        Assertions.assertThat(defaultCheckpointStatsTracker.createSnapshot()).isEqualTo(createSnapshot2);
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats((String) null));
        CheckpointStatsSnapshot createSnapshot3 = defaultCheckpointStatsTracker.createSnapshot();
        Assertions.assertThat(createSnapshot3).isNotEqualTo(createSnapshot2);
        defaultCheckpointStatsTracker.reportInitializationStarted(Collections.emptySet(), 0L);
        reportRestoredCheckpoint(defaultCheckpointStatsTracker, new RestoredCheckpointStats(12L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 12L, (String) null, 42L));
        CheckpointStatsSnapshot createSnapshot4 = defaultCheckpointStatsTracker.createSnapshot();
        Assertions.assertThat(createSnapshot4).isNotEqualTo(createSnapshot3);
        Assertions.assertThat(defaultCheckpointStatsTracker.createSnapshot()).isEqualTo(createSnapshot4);
    }

    @Test
    public void testSpanCreation() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        final ArrayList arrayList = new ArrayList();
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(10, new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { // from class: org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTrackerTest.2
            public void addSpan(SpanBuilder spanBuilder) {
                arrayList.add(spanBuilder.build());
            }
        });
        PendingCheckpointStats reportPendingCheckpoint = defaultCheckpointStatsTracker.reportPendingCheckpoint(42L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0, false));
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats((String) null));
        Assertions.assertThat(arrayList.size()).isEqualTo(1);
        Span span = (Span) Iterables.getOnlyElement(arrayList);
        Assertions.assertThat(span.getAttributes().get("checkpointId")).isEqualTo(42L);
        Assertions.assertThat(span.getAttributes().get("checkpointType")).isEqualTo("Checkpoint");
        Assertions.assertThat(span.getAttributes().get("isUnaligned")).isEqualTo("false");
        arrayList.clear();
        PendingCheckpointStats reportPendingCheckpoint2 = defaultCheckpointStatsTracker.reportPendingCheckpoint(43L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        reportPendingCheckpoint2.reportSubtaskStats(jobVertexID, createSubtaskStats(0, true));
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint2.toCompletedCheckpointStats((String) null));
        Assertions.assertThat(arrayList.size()).isEqualTo(1);
        Span span2 = (Span) Iterables.getOnlyElement(arrayList);
        Assertions.assertThat(span2.getAttributes().get("checkpointId")).isEqualTo(43L);
        Assertions.assertThat(span2.getAttributes().get("checkpointType")).isEqualTo("Checkpoint");
        Assertions.assertThat(span2.getAttributes().get("isUnaligned")).isEqualTo("true");
    }

    @Test
    public void testInitializationSpanCreation() throws Exception {
        final ArrayList arrayList = new ArrayList();
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(10, new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { // from class: org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTrackerTest.3
            public void addSpan(SpanBuilder spanBuilder) {
                arrayList.add(spanBuilder.build());
            }
        });
        ExecutionAttemptID randomId = ExecutionAttemptID.randomId();
        ExecutionAttemptID randomId2 = ExecutionAttemptID.randomId();
        defaultCheckpointStatsTracker.reportInitializationStarted(new HashSet(Arrays.asList(randomId, randomId2)), 100L);
        reportRestoredCheckpoint(defaultCheckpointStatsTracker, new RestoredCheckpointStats(42L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), 100L, (String) null, 1024L));
        SubTaskInitializationMetricsBuilder status = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        status.addDurationMetric("MailboxStartDurationMs", 10L);
        status.addDurationMetric("ReadOutputDataDurationMs", 20L);
        status.addDurationMetric("InitializeStateDurationMs", 30L);
        status.addDurationMetric("GateRestoreDurationMs", 40L);
        defaultCheckpointStatsTracker.reportInitializationMetrics(randomId, status.build(215L));
        Assertions.assertThat(arrayList).isEmpty();
        SubTaskInitializationMetricsBuilder status2 = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        status2.addDurationMetric("MailboxStartDurationMs", 10L);
        status2.addDurationMetric("ReadOutputDataDurationMs", 20L);
        status2.addDurationMetric("InitializeStateDurationMs", 30L);
        status2.addDurationMetric("GateRestoreDurationMs", 40L);
        defaultCheckpointStatsTracker.reportInitializationMetrics(randomId2, status2.build(215L));
        Assertions.assertThat(arrayList.size()).isEqualTo(1);
        Span span = (Span) Iterables.getOnlyElement(arrayList);
        Assertions.assertThat(span.getStartTsMillis()).isEqualTo(100L);
        Assertions.assertThat(span.getEndTsMillis()).isEqualTo(215L);
        Assertions.assertThat(span.getAttributes().get("checkpointId")).isEqualTo(42L);
        Assertions.assertThat(span.getAttributes().get("fullSize")).isEqualTo(1024L);
        arrayList.clear();
        ExecutionAttemptID randomId3 = ExecutionAttemptID.randomId();
        ExecutionAttemptID randomId4 = ExecutionAttemptID.randomId();
        defaultCheckpointStatsTracker.reportInitializationStarted(new HashSet(Arrays.asList(randomId3, randomId4)), 100L);
        reportRestoredCheckpoint(defaultCheckpointStatsTracker, new RestoredCheckpointStats(44L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), 100L, (String) null, 1024L));
        SubTaskInitializationMetricsBuilder status3 = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        status3.addDurationMetric("MailboxStartDurationMs", 10L);
        status3.addDurationMetric("ReadOutputDataDurationMs", 20L);
        status3.addDurationMetric("InitializeStateDurationMs", 30L);
        status3.addDurationMetric("GateRestoreDurationMs", 40L);
        defaultCheckpointStatsTracker.reportInitializationMetrics(randomId3, status3.build(215L));
        Assertions.assertThat(arrayList).isEmpty();
        SubTaskInitializationMetricsBuilder status4 = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        status4.addDurationMetric("MailboxStartDurationMs", 10L);
        status4.addDurationMetric("ReadOutputDataDurationMs", 20L);
        status4.addDurationMetric("InitializeStateDurationMs", 30L);
        status4.addDurationMetric("GateRestoreDurationMs", 40L);
        defaultCheckpointStatsTracker.reportInitializationMetrics(randomId4, status4.build(215L));
        Assertions.assertThat(arrayList.size()).isEqualTo(1);
        Assertions.assertThat(((Span) Iterables.getOnlyElement(arrayList)).getAttributes().get("checkpointId")).isEqualTo(44L);
    }

    @Test
    void testMetricsRegistration() {
        final ArrayList arrayList = new ArrayList();
        new DefaultCheckpointStatsTracker(0, new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { // from class: org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTrackerTest.4
            public <T, G extends Gauge<T>> G gauge(String str, G g) {
                if (g != null) {
                    arrayList.add(str);
                }
                return g;
            }
        });
        Assertions.assertThat(arrayList).containsAll(Arrays.asList("totalNumberOfCheckpoints", "numberOfInProgressCheckpoints", "numberOfCompletedCheckpoints", "numberOfFailedCheckpoints", "lastCheckpointRestoreTimestamp", "lastCheckpointSize", "lastCheckpointFullSize", "lastCheckpointDuration", "lastCheckpointProcessedData", "lastCheckpointPersistedData", "lastCheckpointExternalPath", "lastCompletedCheckpointId", "lastCheckpointCompletedTimestamp"));
        Assertions.assertThat(arrayList).hasSize(13);
    }

    @Test
    void testMetricsAreUpdated() throws Exception {
        final HashMap hashMap = new HashMap();
        UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup unregisteredJobManagerJobMetricGroup = new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { // from class: org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTrackerTest.5
            public <T, G extends Gauge<T>> G gauge(String str, G g) {
                hashMap.put(str, g);
                return g;
            }
        };
        JobVertexID jobVertexID = new JobVertexID();
        new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(0, unregisteredJobManagerJobMetricGroup);
        Assertions.assertThat(hashMap).hasSize(13);
        Gauge gauge = (Gauge) hashMap.get("totalNumberOfCheckpoints");
        Gauge gauge2 = (Gauge) hashMap.get("numberOfInProgressCheckpoints");
        Gauge gauge3 = (Gauge) hashMap.get("numberOfCompletedCheckpoints");
        Gauge gauge4 = (Gauge) hashMap.get("numberOfFailedCheckpoints");
        Gauge gauge5 = (Gauge) hashMap.get("lastCheckpointRestoreTimestamp");
        Gauge gauge6 = (Gauge) hashMap.get("lastCheckpointSize");
        Gauge gauge7 = (Gauge) hashMap.get("lastCheckpointFullSize");
        Gauge gauge8 = (Gauge) hashMap.get("lastCheckpointDuration");
        Gauge gauge9 = (Gauge) hashMap.get("lastCheckpointProcessedData");
        Gauge gauge10 = (Gauge) hashMap.get("lastCheckpointPersistedData");
        Gauge gauge11 = (Gauge) hashMap.get("lastCheckpointExternalPath");
        Gauge gauge12 = (Gauge) hashMap.get("lastCompletedCheckpointId");
        Gauge gauge13 = (Gauge) hashMap.get("lastCheckpointCompletedTimestamp");
        Assertions.assertThat((Long) gauge.getValue()).isZero();
        Assertions.assertThat((Integer) gauge2.getValue()).isZero();
        Assertions.assertThat((Long) gauge3.getValue()).isZero();
        Assertions.assertThat((Long) gauge4.getValue()).isZero();
        Assertions.assertThat((Long) gauge5.getValue()).isEqualTo(-1L);
        Assertions.assertThat((Long) gauge6.getValue()).isEqualTo(-1L);
        Assertions.assertThat((Long) gauge7.getValue()).isEqualTo(-1L);
        Assertions.assertThat((Long) gauge8.getValue()).isEqualTo(-1L);
        Assertions.assertThat((Long) gauge9.getValue()).isEqualTo(-1L);
        Assertions.assertThat((Long) gauge10.getValue()).isEqualTo(-1L);
        Assertions.assertThat((String) gauge11.getValue()).isEqualTo("n/a");
        Assertions.assertThat((Long) gauge12.getValue()).isEqualTo(-1L);
        Assertions.assertThat((Long) gauge13.getValue()).isEqualTo(-1L);
        PendingCheckpointStats reportPendingCheckpoint = defaultCheckpointStatsTracker.reportPendingCheckpoint(0L, 0L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        Assertions.assertThat((Long) gauge.getValue()).isOne();
        Assertions.assertThat((Integer) gauge2.getValue()).isOne();
        Assertions.assertThat((Long) gauge3.getValue()).isZero();
        Assertions.assertThat((Long) gauge4.getValue()).isZero();
        SubtaskStateStats subtaskStateStats = new SubtaskStateStats(0, 11231230L, 123L, 12381238L, 0L, 0L, 4242L, 4444L, 0L, 0L, false, true);
        Assertions.assertThat(reportPendingCheckpoint.reportSubtaskStats(jobVertexID, subtaskStateStats)).isTrue();
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats("myexternalpath"));
        Assertions.assertThat((Long) gauge.getValue()).isOne();
        Assertions.assertThat((Integer) gauge2.getValue()).isZero();
        Assertions.assertThat((Long) gauge3.getValue()).isOne();
        Assertions.assertThat((Long) gauge4.getValue()).isZero();
        Assertions.assertThat((Long) gauge5.getValue()).isEqualTo(-1L);
        Assertions.assertThat((Long) gauge6.getValue()).isEqualTo(123L);
        Assertions.assertThat((Long) gauge7.getValue()).isEqualTo(12381238L);
        Assertions.assertThat((Long) gauge9.getValue()).isEqualTo(4242L);
        Assertions.assertThat((Long) gauge10.getValue()).isEqualTo(4444L);
        Assertions.assertThat((Long) gauge8.getValue()).isEqualTo(11231230L);
        Assertions.assertThat((String) gauge11.getValue()).isEqualTo("myexternalpath");
        Assertions.assertThat((Long) gauge12.getValue()).isZero();
        Assertions.assertThat((Long) gauge13.getValue()).isEqualTo(11231230L);
        defaultCheckpointStatsTracker.reportFailedCheckpoint(defaultCheckpointStatsTracker.reportPendingCheckpoint(1L, 11L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1)).toFailedCheckpoint(1230123L, (Throwable) null));
        Assertions.assertThat((Long) gauge.getValue()).isEqualTo(2L);
        Assertions.assertThat((Integer) gauge2.getValue()).isZero();
        Assertions.assertThat((Long) gauge3.getValue()).isOne();
        Assertions.assertThat((Long) gauge4.getValue()).isOne();
        Assertions.assertThat((Long) gauge12.getValue()).isZero();
        RestoredCheckpointStats restoredCheckpointStats = new RestoredCheckpointStats(1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 183419283L, (String) null, 42L);
        defaultCheckpointStatsTracker.reportInitializationStarted(Collections.emptySet(), 183419283L);
        reportRestoredCheckpoint(defaultCheckpointStatsTracker, restoredCheckpointStats);
        Assertions.assertThat((Long) gauge.getValue()).isEqualTo(2L);
        Assertions.assertThat((Integer) gauge2.getValue()).isZero();
        Assertions.assertThat((Long) gauge3.getValue()).isOne();
        Assertions.assertThat((Long) gauge4.getValue()).isOne();
        Assertions.assertThat((Long) gauge12.getValue()).isZero();
        Assertions.assertThat((Long) gauge5.getValue()).isEqualTo(183419283L);
        PendingCheckpointStats reportPendingCheckpoint2 = defaultCheckpointStatsTracker.reportPendingCheckpoint(2L, 5000L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        reportPendingCheckpoint2.reportSubtaskStats(jobVertexID, subtaskStateStats);
        defaultCheckpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint2.toCompletedCheckpointStats((String) null));
        Assertions.assertThat((Long) gauge12.getValue()).isEqualTo(2L);
        Assertions.assertThat((String) gauge11.getValue()).isEqualTo("n/a");
    }

    private SubtaskStateStats createSubtaskStats(int i) {
        return createSubtaskStats(i, false);
    }

    private SubtaskStateStats createSubtaskStats(int i, boolean z) {
        return new SubtaskStateStats(i, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, z, true);
    }

    private void reportRestoredCheckpoint(CheckpointStatsTracker checkpointStatsTracker, RestoredCheckpointStats restoredCheckpointStats) {
        checkpointStatsTracker.reportRestoredCheckpoint(restoredCheckpointStats.getCheckpointId(), restoredCheckpointStats.getProperties(), restoredCheckpointStats.getExternalPath(), restoredCheckpointStats.getStateSize());
    }
}
