package org.apache.flink.runtime.checkpoint;

import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.class */
class CheckpointCoordinatorTriggeringTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;

    @TempDir
    private Path temporaryFolder;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest$TestingMasterHook.class */
    private static class TestingMasterHook implements MasterTriggerRestoreHook<String> {
        private final SimpleVersionedSerializer<String> serializer;
        private final CompletableFuture<String> checkpointFuture;
        private final OneShotLatch triggerCheckpointLatch;

        private TestingMasterHook(CompletableFuture<String> completableFuture) {
            this(completableFuture, new OneShotLatch());
        }

        private TestingMasterHook(CompletableFuture<String> completableFuture, OneShotLatch oneShotLatch) {
            this.serializer = new CheckpointCoordinatorTestingUtils.StringSerializer();
            this.checkpointFuture = completableFuture;
            this.triggerCheckpointLatch = oneShotLatch;
        }

        public String getIdentifier() {
            return "testing master hook";
        }

        @Nullable
        public CompletableFuture<String> triggerCheckpoint(long j, long j2, Executor executor) {
            this.triggerCheckpointLatch.trigger();
            return this.checkpointFuture;
        }

        public void restoreCheckpoint(long j, @Nullable String str) {
        }

        @Nullable
        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            return this.serializer;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest$UnstableCheckpointIDCounter.class */
    private static class UnstableCheckpointIDCounter implements CheckpointIDCounter {
        private final Predicate<Long> checkpointFailurePredicate;
        private long id = 0;

        public UnstableCheckpointIDCounter(Predicate<Long> predicate) {
            this.checkpointFailurePredicate = (Predicate) Preconditions.checkNotNull(predicate);
        }

        public void start() {
        }

        public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
            return FutureUtils.completedVoidFuture();
        }

        public long getAndIncrement() {
            Predicate<Long> predicate = this.checkpointFailurePredicate;
            long j = this.id;
            this.id = j + 1;
            if (predicate.test(Long.valueOf(j))) {
                throw new RuntimeException("CheckpointIDCounter#getAndIncrement fails by design");
            }
            return this.id;
        }

        public long get() {
            return this.id;
        }

        public void setCount(long j) {
        }
    }

    CheckpointCoordinatorTriggeringTest() {
    }

    @BeforeEach
    void setUp() {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    void testPeriodicTriggering() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
            ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
            CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
            build2.startCheckpointScheduler();
            for (int i = 0; i < 5; i++) {
                this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            checkRecordedTriggeredCheckpoints(5, currentTimeMillis, checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId));
            build2.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId).size()).isEqualTo(5);
            checkpointRecorderTaskManagerGateway.resetCount();
            build2.startCheckpointScheduler();
            for (int i2 = 0; i2 < 5; i2++) {
                this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            checkRecordedTriggeredCheckpoints(5, currentTimeMillis, checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId));
            build2.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId).size()).isEqualTo(5);
            build2.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    private void checkRecordedTriggeredCheckpoints(int i, long j, List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> list) {
        Assertions.assertThat(list).hasSize(i);
        long j2 = -1;
        long j3 = -1;
        for (CheckpointCoordinatorTestingUtils.TriggeredCheckpoint triggeredCheckpoint : list) {
            Assertions.assertThat(triggeredCheckpoint.checkpointId).as("Trigger checkpoint id should be in increase order", new Object[0]).isGreaterThan(j2);
            Assertions.assertThat(triggeredCheckpoint.timestamp).as("Trigger checkpoint timestamp should be in increase order", new Object[0]).isGreaterThanOrEqualTo(j3);
            Assertions.assertThat(triggeredCheckpoint.timestamp).as("Trigger checkpoint timestamp should be larger than the start time", new Object[0]).isGreaterThanOrEqualTo(j);
            j2 = triggeredCheckpoint.checkpointId;
            j3 = triggeredCheckpoint.timestamp;
        }
    }

    @Test
    void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint takeSavepoint = takeSavepoint(build, attemptId);
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(build, standaloneCompletedCheckpointStore, standaloneCheckpointIDCounter);
        createCheckpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath(takeSavepoint.getExternalPointer(), true, RecoveryClaimMode.NO_CLAIM), build.getAllVertices(), getClass().getClassLoader());
        createCheckpointCoordinator.shutdown();
        checkpointRecorderTaskManagerGateway.resetCount();
        CheckpointCoordinator createCheckpointCoordinator2 = createCheckpointCoordinator(build, standaloneCompletedCheckpointStore, standaloneCheckpointIDCounter);
        createCheckpointCoordinator2.restoreLatestCheckpointedStateToAll(new HashSet(build.getAllVertices().values()), true);
        createCheckpointCoordinator2.startCheckpointScheduler();
        CompletableFuture triggerCheckpoint = createCheckpointCoordinator2.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        createCheckpointCoordinator2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 2L), TASK_MANAGER_LOCATION_INFO);
        triggerCheckpoint.get();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(attemptId).checkpointOptions.getCheckpointType()).isEqualTo(CheckpointType.FULL_CHECKPOINT);
    }

    @Test
    void testTriggeringFullCheckpoints() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint takeSavepoint = takeSavepoint(build, attemptId);
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(build, new StandaloneCompletedCheckpointStore(1), new StandaloneCheckpointIDCounter());
        createCheckpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath(takeSavepoint.getExternalPointer(), true, RecoveryClaimMode.NO_CLAIM), build.getAllVertices(), getClass().getClassLoader());
        takeSavepoint(build, attemptId, createCheckpointCoordinator, 2);
        Assertions.assertThat(createCheckpointCoordinator.getRecentExpiredCheckpoints().isEmpty()).isTrue();
        createCheckpointCoordinator.startCheckpointScheduler();
        checkpointRecorderTaskManagerGateway.resetCount();
        CompletableFuture triggerCheckpoint = createCheckpointCoordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        createCheckpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 3L), TASK_MANAGER_LOCATION_INFO);
        triggerCheckpoint.get();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(attemptId).checkpointOptions.getCheckpointType()).isEqualTo(CheckpointType.FULL_CHECKPOINT);
    }

    @Test
    void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()), new StandaloneCompletedCheckpointStore(1), new StandaloneCheckpointIDCounter());
        createCheckpointCoordinator.startCheckpointScheduler();
        checkpointRecorderTaskManagerGateway.resetCount();
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            createCheckpointCoordinator.triggerCheckpoint((CheckpointType) null);
        });
    }

    @Test
    void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(build, new StandaloneCompletedCheckpointStore(1), new StandaloneCheckpointIDCounter());
        createCheckpointCoordinator.startCheckpointScheduler();
        checkpointRecorderTaskManagerGateway.resetCount();
        CompletableFuture triggerCheckpoint = createCheckpointCoordinator.triggerCheckpoint(CheckpointType.INCREMENTAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        createCheckpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
        triggerCheckpoint.get();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(attemptId).checkpointOptions.getCheckpointType()).isEqualTo(CheckpointType.CHECKPOINT);
    }

    @Test
    void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(build, new StandaloneCompletedCheckpointStore(1), new StandaloneCheckpointIDCounter());
        createCheckpointCoordinator.startCheckpointScheduler();
        checkpointRecorderTaskManagerGateway.resetCount();
        CompletableFuture triggerCheckpoint = createCheckpointCoordinator.triggerCheckpoint(CheckpointType.FULL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        createCheckpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
        triggerCheckpoint.get();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(attemptId).checkpointOptions.getCheckpointType()).isEqualTo(CheckpointType.FULL_CHECKPOINT);
    }

    @Test
    void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint takeSavepoint = takeSavepoint(build, attemptId);
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(build, new StandaloneCompletedCheckpointStore(1), new StandaloneCheckpointIDCounter());
        createCheckpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath(takeSavepoint.getExternalPointer(), true, RecoveryClaimMode.NO_CLAIM), build.getAllVertices(), getClass().getClassLoader());
        takeSavepoint(build, attemptId, createCheckpointCoordinator, 2);
        createCheckpointCoordinator.startCheckpointScheduler();
        checkpointRecorderTaskManagerGateway.resetCount();
        CompletableFuture triggerCheckpoint = createCheckpointCoordinator.triggerCheckpoint(CheckpointType.INCREMENTAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        createCheckpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 3L), TASK_MANAGER_LOCATION_INFO);
        triggerCheckpoint.get();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(attemptId).checkpointOptions.getCheckpointType()).isEqualTo(CheckpointType.FULL_CHECKPOINT);
    }

    private CompletedCheckpoint takeSavepoint(ExecutionGraph executionGraph, ExecutionAttemptID executionAttemptID) throws Exception {
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(executionGraph, new StandaloneCompletedCheckpointStore(1), new StandaloneCheckpointIDCounter());
        CompletedCheckpoint takeSavepoint = takeSavepoint(executionGraph, executionAttemptID, createCheckpointCoordinator, 1);
        createCheckpointCoordinator.shutdown();
        return takeSavepoint;
    }

    private CompletedCheckpoint takeSavepoint(ExecutionGraph executionGraph, ExecutionAttemptID executionAttemptID, CheckpointCoordinator checkpointCoordinator, int i) throws Exception {
        CompletableFuture triggerSavepoint = checkpointCoordinator.triggerSavepoint(TempDirUtils.newFolder(this.temporaryFolder).getPath(), SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(executionGraph.getJobID(), executionAttemptID, i), TASK_MANAGER_LOCATION_INFO);
        return (CompletedCheckpoint) triggerSavepoint.get();
    }

    private CheckpointCoordinator createCheckpointCoordinator(ExecutionGraph executionGraph, StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore, CheckpointIDCounter checkpointIDCounter) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10000L).setCheckpointTimeout(200000L).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore(standaloneCompletedCheckpointStore).setCheckpointIDCounter(checkpointIDCounter).setTimer(this.manuallyTriggeredScheduledExecutor).build(executionGraph);
    }

    @Test
    void testMinTimeBetweenCheckpointsInterval() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(12L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(50L).setMaxConcurrentCheckpoints(1).build()).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        try {
            build2.startCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat(Long.valueOf(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId).get(0).checkpointId)).isOne();
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(build.getJobID(), attemptId, 1L);
            long nanoTime = System.nanoTime();
            build2.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
            checkpointRecorderTaskManagerGateway.resetCount();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            while (checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId).isEmpty()) {
                Thread.sleep(12L);
                this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Long valueOf = Long.valueOf(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId).get(0).checkpointId);
            long nanoTime2 = System.nanoTime();
            Assertions.assertThat(valueOf).isEqualTo(2L);
            long j = (nanoTime2 - nanoTime) / 1000000;
            if (j + 1 < 50) {
                Assertions.fail("checkpoint came too early: delay was " + j + " but should have been at least 50");
            }
        } finally {
            build2.stopCheckpointScheduler();
            build2.shutdown();
        }
    }

    @Test
    void testStopPeriodicScheduler() throws Exception {
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            triggerPeriodicCheckpoint.get();
            Assertions.fail("The triggerCheckpoint call expected an exception");
        } catch (ExecutionException e) {
            Assertions.assertThat(ExceptionUtils.findThrowable(e, CheckpointException.class)).isPresent().map((v0) -> {
                return v0.getCheckpointFailureReason();
            }).get().isEqualTo(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
        }
        CompletableFuture triggerCheckpoint = createCheckpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(triggerCheckpoint).isNotCompletedExceptionally();
    }

    @Test
    void testWithNoOpCheckpointStatsTracker() throws Exception {
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID(), 2, 2).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ((ExecutionVertex) build.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().markFinished();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(NoOpCheckpointStatsTracker.INSTANCE).setAllowCheckpointsAfterTasksFinished(true).build(build);
        triggerNonPeriodicCheckpoint(build2);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(build2.getNumberOfPendingCheckpoints()).as("The PendingCheckpoint was added. The PendingCheckpointStats should have been initialized.", new Object[0]).isEqualTo(1);
    }

    @Test
    void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator();
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        createCheckpointCoordinator.shutdown();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            triggerPeriodicCheckpoint.get();
            Assertions.fail("Should not reach here");
        } catch (ExecutionException e) {
            Assertions.assertThat(ExceptionUtils.findThrowable(e, CheckpointException.class)).isPresent().map((v0) -> {
                return v0.getCheckpointFailureReason();
            }).get().isEqualTo(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
        }
    }

    @Test
    void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(build);
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isTrue();
        Assertions.assertThat(createCheckpointCoordinator.getTriggerRequestQueue()).isEmpty();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint2 = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isTrue();
        Assertions.assertThat(createCheckpointCoordinator.getTriggerRequestQueue()).hasSize(1);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(triggerPeriodicCheckpoint).isNotCompletedExceptionally();
        Assertions.assertThat(triggerPeriodicCheckpoint2).isNotCompletedExceptionally();
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isFalse();
        Assertions.assertThat(createCheckpointCoordinator.getTriggerRequestQueue()).isEmpty();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId)).hasSize(2);
    }

    @Test
    void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(new UnstableCheckpointIDCounter(l -> {
            return l.longValue() == 0;
        })).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        build2.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint = triggerNonPeriodicCheckpoint(build2);
        Assertions.assertThat(build2.isTriggering()).isTrue();
        Assertions.assertThat(build2.getTriggerRequestQueue()).isEmpty();
        CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint2 = triggerNonPeriodicCheckpoint(build2);
        CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint3 = triggerNonPeriodicCheckpoint(build2);
        Assertions.assertThat(build2.isTriggering()).isTrue();
        Assertions.assertThat(build2.getTriggerRequestQueue()).hasSize(2);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(triggerNonPeriodicCheckpoint).isCompletedExceptionally();
        Assertions.assertThat(triggerNonPeriodicCheckpoint2).isNotCompletedExceptionally();
        Assertions.assertThat(triggerNonPeriodicCheckpoint3).isNotCompletedExceptionally();
        Assertions.assertThat(build2.isTriggering()).isFalse();
        Assertions.assertThat(build2.getTriggerRequestQueue()).isEmpty();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId).size()).isEqualTo(2);
    }

    @Test
    void testTriggerCheckpointRequestCancelled() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator(build);
        CompletableFuture completableFuture = new CompletableFuture();
        createCheckpointCoordinator.addMasterHook(new TestingMasterHook(completableFuture));
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isTrue();
        this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.CheckpointCanceller.class);
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isTrue();
        try {
            triggerPeriodicCheckpoint.get();
            Assertions.fail("Should not reach here");
        } catch (ExecutionException e) {
            Assertions.assertThat(ExceptionUtils.findThrowable(e, CheckpointException.class)).isPresent().map((v0) -> {
                return v0.getCheckpointFailureReason();
            }).get().isEqualTo(CheckpointFailureReason.CHECKPOINT_EXPIRED);
        }
        Assertions.assertThat(createCheckpointCoordinator.getRecentExpiredCheckpoints().size()).isEqualTo(1);
        completableFuture.complete("finish master hook");
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isFalse();
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId)).isEmpty();
        Assertions.assertThat(createCheckpointCoordinator.getTriggerRequestQueue()).isEmpty();
    }

    @Test
    void testTriggerCheckpointInitializationFailed() throws Exception {
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(new UnstableCheckpointIDCounter(l -> {
            return l.longValue() == 0;
        })).setTimer(this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(build);
        Assertions.assertThat(build.isTriggering()).isTrue();
        Assertions.assertThat(build.getTriggerRequestQueue()).isEmpty();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            triggerPeriodicCheckpoint.get();
            Assertions.fail("This checkpoint should fail through UnstableCheckpointIDCounter");
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
            Assertions.assertThat(findThrowable.isPresent()).isTrue();
            Assertions.assertThat(((CheckpointException) findThrowable.get()).getCheckpointFailureReason()).isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
        }
        Assertions.assertThat(build.isTriggering()).isFalse();
        Assertions.assertThat(build.getTriggerRequestQueue()).isEmpty();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint2 = triggerPeriodicCheckpoint(build);
        Assertions.assertThat(build.isTriggering()).isTrue();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(triggerPeriodicCheckpoint2).isNotCompletedExceptionally();
        Assertions.assertThat(build.isTriggering()).isFalse();
        Assertions.assertThat(build.getTriggerRequestQueue()).isEmpty();
    }

    @Test
    void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionAttemptID attemptId = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator createCheckpointCoordinator = createCheckpointCoordinator();
        CompletableFuture completableFuture = new CompletableFuture();
        createCheckpointCoordinator.addMasterHook(new TestingMasterHook(completableFuture));
        createCheckpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint = triggerPeriodicCheckpoint(createCheckpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isTrue();
        completableFuture.completeExceptionally(new Exception("by design"));
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(createCheckpointCoordinator.isTriggering()).isFalse();
        try {
            triggerPeriodicCheckpoint.get();
            Assertions.fail("Should not reach here");
        } catch (ExecutionException e) {
            Assertions.assertThat(ExceptionUtils.findThrowable(e, CheckpointException.class)).isPresent().map((v0) -> {
                return v0.getCheckpointFailureReason();
            }).get().isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
        }
        Assertions.assertThat(checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(attemptId)).isEmpty();
        Assertions.assertThat(createCheckpointCoordinator.getTriggerRequestQueue()).isEmpty();
    }

    @Test
    void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(new ScheduledExecutorServiceAdapter(newSingleThreadScheduledExecutor)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().build()).build(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService())).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        build.addMasterHook(new TestingMasterHook(completableFuture, oneShotLatch));
        try {
            build.triggerCheckpoint(false);
            CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
            oneShotLatch.await();
            completableFuture.complete("Completed");
            build.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
            try {
                triggerCheckpoint.get();
                Assertions.fail("Expected the second checkpoint to fail.");
            } catch (ExecutionException e) {
                Assertions.assertThat(ExceptionUtils.stripExecutionException(e)).isInstanceOf(CheckpointException.class);
            }
            build.shutdown();
            ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, new ExecutorService[]{newSingleThreadScheduledExecutor});
        } catch (Throwable th) {
            build.shutdown();
            ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, new ExecutorService[]{newSingleThreadScheduledExecutor});
            throw th;
        }
    }

    private CheckpointCoordinator createCheckpointCoordinator() throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    private CheckpointCoordinator createCheckpointCoordinator(ExecutionGraph executionGraph) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build(executionGraph);
    }

    private CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, true);
    }

    private CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, false);
    }
}
