/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class CheckpointCoordinatorTriggeringTest
extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testPeriodicTriggering() {
        try {
            int i;
            long start = System.currentTimeMillis();
            CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
            ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
            ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
            CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCheckpointCoordinatorConfiguration(checkpointCoordinatorConfiguration).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build();
            checkpointCoordinator.startCheckpointScheduler();
            for (i = 0; i < 5; ++i) {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            this.checkRecordedTriggeredCheckpoints(5, start, gateway.getTriggeredCheckpoints(attemptID));
            checkpointCoordinator.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)5L, (long)gateway.getTriggeredCheckpoints(attemptID).size());
            gateway.resetCount();
            checkpointCoordinator.startCheckpointScheduler();
            for (i = 0; i < 5; ++i) {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            this.checkRecordedTriggeredCheckpoints(5, start, gateway.getTriggeredCheckpoints(attemptID));
            checkpointCoordinator.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)5L, (long)gateway.getTriggeredCheckpoints(attemptID).size());
            checkpointCoordinator.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private void checkRecordedTriggeredCheckpoints(int numTrigger, long start, List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> checkpoints) {
        Assert.assertEquals((long)numTrigger, (long)checkpoints.size());
        long lastId = -1L;
        long lastTs = -1L;
        for (CheckpointCoordinatorTestingUtils.TriggeredCheckpoint checkpoint : checkpoints) {
            Assert.assertTrue((String)"Trigger checkpoint id should be in increase order", (checkpoint.checkpointId > lastId ? 1 : 0) != 0);
            Assert.assertTrue((String)"Trigger checkpoint timestamp should be in increase order", (checkpoint.timestamp >= lastTs ? 1 : 0) != 0);
            Assert.assertTrue((String)"Trigger checkpoint timestamp should be larger than the start time", (checkpoint.timestamp >= start ? 1 : 0) != 0);
            lastId = checkpoint.checkpointId;
            lastTs = checkpoint.timestamp;
        }
    }

    @Test
    public void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint savepoint = this.takeSavepoint(graph, attemptID);
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath((String)savepoint.getExternalPointer(), (boolean)true, (RestoreMode)RestoreMode.NO_CLAIM), graph.getAllVertices(), ((Object)((Object)this)).getClass().getClassLoader());
        checkpointCoordinator.shutdown();
        gateway.resetCount();
        checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.restoreLatestCheckpointedStateToAll(new HashSet(graph.getAllVertices().values()), true);
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture checkpoint = checkpointCoordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 2L), TASK_MANAGER_LOCATION_INFO);
        checkpoint.get();
        Assert.assertThat((Object)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptID).checkpointOptions.getCheckpointType(), (Matcher)Matchers.is((Object)CheckpointType.FULL_CHECKPOINT));
    }

    @Test
    public void testTriggeringFullCheckpoints() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint savepoint = this.takeSavepoint(graph, attemptID);
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath((String)savepoint.getExternalPointer(), (boolean)true, (RestoreMode)RestoreMode.NO_CLAIM), graph.getAllVertices(), ((Object)((Object)this)).getClass().getClassLoader());
        this.takeSavepoint(graph, attemptID, checkpointCoordinator, 2);
        checkpointCoordinator.startCheckpointScheduler();
        gateway.resetCount();
        CompletableFuture checkpoint = checkpointCoordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 3L), TASK_MANAGER_LOCATION_INFO);
        checkpoint.get();
        Assert.assertThat((Object)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptID).checkpointOptions.getCheckpointType(), (Matcher)Matchers.is((Object)CheckpointType.FULL_CHECKPOINT));
    }

    private CompletedCheckpoint takeSavepoint(ExecutionGraph graph, ExecutionAttemptID attemptID) throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, new StandaloneCompletedCheckpointStore(1), (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        CompletedCheckpoint savepoint = this.takeSavepoint(graph, attemptID, checkpointCoordinator, 1);
        checkpointCoordinator.shutdown();
        return savepoint;
    }

    private CompletedCheckpoint takeSavepoint(ExecutionGraph graph, ExecutionAttemptID attemptID, CheckpointCoordinator checkpointCoordinator, int savepointId) throws Exception {
        CompletableFuture savepointFuture = checkpointCoordinator.triggerSavepoint(this.temporaryFolder.newFolder().getPath(), SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, (long)savepointId), TASK_MANAGER_LOCATION_INFO);
        return (CompletedCheckpoint)savepointFuture.get();
    }

    private CheckpointCoordinator createCheckpointCoordinator(ExecutionGraph graph, StandaloneCompletedCheckpointStore checkpointStore, CheckpointIDCounter checkpointIDCounter) throws Exception {
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10000L).setCheckpointTimeout(200000L).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build();
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCheckpointCoordinatorConfiguration(checkpointCoordinatorConfiguration).setCompletedCheckpointStore((CompletedCheckpointStore)checkpointStore).setCheckpointIDCounter(checkpointIDCounter).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMinTimeBetweenCheckpointsInterval() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        long delay = 50L;
        long checkpointInterval = 12L;
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(12L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(50L).setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCheckpointCoordinatorConfiguration(checkpointCoordinatorConfiguration).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build();
        try {
            checkpointCoordinator.startCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Long firstCallId = gateway.getTriggeredCheckpoints((ExecutionAttemptID)attemptID).get((int)0).checkpointId;
            Assert.assertEquals((long)1L, (long)firstCallId);
            AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1L);
            long ackTime = System.nanoTime();
            checkpointCoordinator.receiveAcknowledgeMessage(ackMsg, TASK_MANAGER_LOCATION_INFO);
            gateway.resetCount();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            while (gateway.getTriggeredCheckpoints(attemptID).isEmpty()) {
                Thread.sleep(12L);
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Long nextCallId = gateway.getTriggeredCheckpoints((ExecutionAttemptID)attemptID).get((int)0).checkpointId;
            long nextCheckpointTime = System.nanoTime();
            Assert.assertEquals((long)2L, (long)nextCallId);
            long delayMillis = (nextCheckpointTime - ackTime) / 1000000L;
            if (delayMillis + 1L < 50L) {
                Assert.fail((String)("checkpoint came too early: delay was " + delayMillis + " but should have been at least " + 50L));
            }
        }
        finally {
            checkpointCoordinator.stopCheckpointScheduler();
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    public void testStopPeriodicScheduler() throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            onCompletionPromise1.get();
            Assert.fail((String)"The triggerCheckpoint call expected an exception");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            Assert.assertTrue((boolean)checkpointExceptionOptional.isPresent());
            Assert.assertEquals((Object)CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, (Object)((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason());
        }
        CompletableFuture onCompletionPromise2 = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)onCompletionPromise2.isCompletedExceptionally());
    }

    @Test
    public void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator();
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        checkpointCoordinator.shutdown();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            onCompletionPromise.get();
            Assert.fail((String)"Should not reach here");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            Assert.assertTrue((boolean)checkpointExceptionOptional.isPresent());
            Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN, (Object)((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason());
        }
    }

    @Test
    public void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph);
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)onCompletionPromise1.isCompletedExceptionally());
        Assert.assertFalse((boolean)onCompletionPromise2.isCompletedExceptionally());
        Assert.assertFalse((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        Assert.assertEquals((long)2L, (long)gateway.getTriggeredCheckpoints(attemptID).size());
    }

    @Test
    public void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setCheckpointIDCounter(new UnstableCheckpointIDCounter(id -> id == 0L)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build();
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerNonPeriodicCheckpoint(checkpointCoordinator);
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = this.triggerNonPeriodicCheckpoint(checkpointCoordinator);
        CompletableFuture<CompletedCheckpoint> onCompletionPromise3 = this.triggerNonPeriodicCheckpoint(checkpointCoordinator);
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)onCompletionPromise1.isCompletedExceptionally());
        Assert.assertFalse((boolean)onCompletionPromise2.isCompletedExceptionally());
        Assert.assertFalse((boolean)onCompletionPromise3.isCompletedExceptionally());
        Assert.assertFalse((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        Assert.assertEquals((long)2L, (long)gateway.getTriggeredCheckpoints(attemptID).size());
    }

    @Test
    public void testTriggerCheckpointRequestCancelled() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph);
        CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<String>();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new TestingMasterHook((CompletableFuture)masterHookCheckpointFuture));
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        try {
            onCompletionPromise.get();
            Assert.fail((String)"Should not reach here");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            Assert.assertTrue((boolean)checkpointExceptionOptional.isPresent());
            Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_EXPIRED, (Object)((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason());
        }
        masterHookCheckpointFuture.complete("finish master hook");
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)gateway.getTriggeredCheckpoints(attemptID).size());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
    }

    @Test
    public void testTriggerCheckpointInitializationFailed() throws Exception {
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(new UnstableCheckpointIDCounter(id -> id == 0L)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build();
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            onCompletionPromise1.get();
            Assert.fail((String)"This checkpoint should fail through UnstableCheckpointIDCounter");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            Assert.assertTrue((boolean)checkpointExceptionOptional.isPresent());
            Assert.assertEquals((Object)CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, (Object)((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason());
        }
        Assert.assertFalse((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
        CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)onCompletionPromise2.isCompletedExceptionally());
        Assert.assertFalse((boolean)checkpointCoordinator.isTriggering());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
    }

    @Test
    public void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build();
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator();
        CompletableFuture masterHookCheckpointFuture = new CompletableFuture();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new TestingMasterHook(masterHookCheckpointFuture));
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointCoordinator.isTriggering());
        masterHookCheckpointFuture.completeExceptionally(new Exception("by design"));
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)checkpointCoordinator.isTriggering());
        try {
            onCompletionPromise.get();
            Assert.fail((String)"Should not reach here");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            Assert.assertTrue((boolean)checkpointExceptionOptional.isPresent());
            Assert.assertEquals((Object)CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, (Object)((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason());
        }
        Assert.assertEquals((long)0L, (long)gateway.getTriggeredCheckpoints(attemptID).size());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getTriggerRequestQueue().size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)new ScheduledExecutorServiceAdapter(scheduledExecutorService)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().build()).build();
        CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<String>();
        OneShotLatch triggerCheckpointLatch = new OneShotLatch();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            CompletableFuture secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            triggerCheckpointLatch.await();
            masterHookCheckpointFuture.complete("Completed");
            checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
            try {
                secondCheckpoint.get();
                Assert.fail((String)"Expected the second checkpoint to fail.");
            }
            catch (ExecutionException ee) {
                Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)CoreMatchers.instanceOf(CheckpointException.class));
            }
        }
        catch (Throwable throwable) {
            checkpointCoordinator.shutdown();
            ExecutorUtils.gracefulShutdown((long)10L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{scheduledExecutorService});
            throw throwable;
        }
        checkpointCoordinator.shutdown();
        ExecutorUtils.gracefulShutdown((long)10L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{scheduledExecutorService});
    }

    private CheckpointCoordinator createCheckpointCoordinator() throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build();
    }

    private CheckpointCoordinator createCheckpointCoordinator(ExecutionGraph graph) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(graph).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build();
    }

    private CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true);
    }

    private CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, false);
    }

    private static class UnstableCheckpointIDCounter
    implements CheckpointIDCounter {
        private final Predicate<Long> checkpointFailurePredicate;
        private long id = 0L;

        public UnstableCheckpointIDCounter(Predicate<Long> checkpointFailurePredicate) {
            this.checkpointFailurePredicate = (Predicate)Preconditions.checkNotNull(checkpointFailurePredicate);
        }

        public void start() {
        }

        public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
            return FutureUtils.completedVoidFuture();
        }

        public long getAndIncrement() {
            if (this.checkpointFailurePredicate.test(this.id++)) {
                throw new RuntimeException("CheckpointIDCounter#getAndIncrement fails by design");
            }
            return this.id;
        }

        public long get() {
            return this.id;
        }

        public void setCount(long newId) {
        }
    }

    private static class TestingMasterHook
    implements MasterTriggerRestoreHook<String> {
        private final SimpleVersionedSerializer<String> serializer = new CheckpointCoordinatorTestingUtils.StringSerializer();
        private final CompletableFuture<String> checkpointFuture;
        private final OneShotLatch triggerCheckpointLatch;

        private TestingMasterHook(CompletableFuture<String> checkpointFuture) {
            this(checkpointFuture, new OneShotLatch());
        }

        private TestingMasterHook(CompletableFuture<String> checkpointFuture, OneShotLatch triggerCheckpointLatch) {
            this.checkpointFuture = checkpointFuture;
            this.triggerCheckpointLatch = triggerCheckpointLatch;
        }

        public String getIdentifier() {
            return "testing master hook";
        }

        @Nullable
        public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
            this.triggerCheckpointLatch.trigger();
            return this.checkpointFuture;
        }

        public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) {
        }

        @Nullable
        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            return this.serializer;
        }
    }
}

