/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.nio.file.Path;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.OptionalAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class CheckpointCoordinatorTriggeringTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @TempDir
    private Path temporaryFolder;

    CheckpointCoordinatorTriggeringTest() {
    }

    @BeforeEach
    void setUp() {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    void testPeriodicTriggering() {
        try {
            int i;
            long start = System.currentTimeMillis();
            CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
            ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
            CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(checkpointCoordinatorConfiguration).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
            checkpointCoordinator.startCheckpointScheduler();
            for (i = 0; i < 5; ++i) {
                this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            this.checkRecordedTriggeredCheckpoints(5, start, gateway.getTriggeredCheckpoints(attemptID));
            checkpointCoordinator.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((int)gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
            gateway.resetCount();
            checkpointCoordinator.startCheckpointScheduler();
            for (i = 0; i < 5; ++i) {
                this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            this.checkRecordedTriggeredCheckpoints(5, start, gateway.getTriggeredCheckpoints(attemptID));
            checkpointCoordinator.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((int)gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
            checkpointCoordinator.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    private void checkRecordedTriggeredCheckpoints(int numTrigger, long start, List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> checkpoints) {
        Assertions.assertThat(checkpoints).hasSize(numTrigger);
        long lastId = -1L;
        long lastTs = -1L;
        for (CheckpointCoordinatorTestingUtils.TriggeredCheckpoint checkpoint : checkpoints) {
            ((AbstractLongAssert)Assertions.assertThat((long)checkpoint.checkpointId).as("Trigger checkpoint id should be in increase order", new Object[0])).isGreaterThan(lastId);
            ((AbstractLongAssert)Assertions.assertThat((long)checkpoint.timestamp).as("Trigger checkpoint timestamp should be in increase order", new Object[0])).isGreaterThanOrEqualTo(lastTs);
            ((AbstractLongAssert)Assertions.assertThat((long)checkpoint.timestamp).as("Trigger checkpoint timestamp should be larger than the start time", new Object[0])).isGreaterThanOrEqualTo(start);
            lastId = checkpoint.checkpointId;
            lastTs = checkpoint.timestamp;
        }
    }

    @Test
    void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint savepoint = this.takeSavepoint(graph, attemptID);
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath((String)savepoint.getExternalPointer(), (boolean)true, (RecoveryClaimMode)RecoveryClaimMode.NO_CLAIM), graph.getAllVertices(), this.getClass().getClassLoader());
        checkpointCoordinator.shutdown();
        gateway.resetCount();
        checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.restoreLatestCheckpointedStateToAll(new HashSet(graph.getAllVertices().values()), true);
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture checkpoint = checkpointCoordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 2L), TASK_MANAGER_LOCATION_INFO);
        checkpoint.get();
        Assertions.assertThat((Object)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptID).checkpointOptions.getCheckpointType()).isEqualTo((Object)CheckpointType.FULL_CHECKPOINT);
    }

    @Test
    void testTriggeringFullCheckpoints() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint savepoint = this.takeSavepoint(graph, attemptID);
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath((String)savepoint.getExternalPointer(), (boolean)true, (RecoveryClaimMode)RecoveryClaimMode.NO_CLAIM), graph.getAllVertices(), this.getClass().getClassLoader());
        this.takeSavepoint(graph, attemptID, checkpointCoordinator, 2);
        Assertions.assertThat((boolean)checkpointCoordinator.getRecentExpiredCheckpoints().isEmpty()).isTrue();
        checkpointCoordinator.startCheckpointScheduler();
        gateway.resetCount();
        CompletableFuture checkpoint = checkpointCoordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 3L), TASK_MANAGER_LOCATION_INFO);
        checkpoint.get();
        Assertions.assertThat((Object)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptID).checkpointOptions.getCheckpointType()).isEqualTo((Object)CheckpointType.FULL_CHECKPOINT);
    }

    @Test
    void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.startCheckpointScheduler();
        gateway.resetCount();
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> checkpointCoordinator.triggerCheckpoint(null));
    }

    @Test
    void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.startCheckpointScheduler();
        gateway.resetCount();
        CompletableFuture checkpoint = checkpointCoordinator.triggerCheckpoint(org.apache.flink.core.execution.CheckpointType.INCREMENTAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1L), TASK_MANAGER_LOCATION_INFO);
        checkpoint.get();
        Assertions.assertThat((Object)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptID).checkpointOptions.getCheckpointType()).isEqualTo((Object)CheckpointType.CHECKPOINT);
    }

    @Test
    void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.startCheckpointScheduler();
        gateway.resetCount();
        CompletableFuture checkpoint = checkpointCoordinator.triggerCheckpoint(org.apache.flink.core.execution.CheckpointType.FULL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1L), TASK_MANAGER_LOCATION_INFO);
        checkpoint.get();
        Assertions.assertThat((Object)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptID).checkpointOptions.getCheckpointType()).isEqualTo((Object)CheckpointType.FULL_CHECKPOINT);
    }

    @Test
    void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint() throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpoint savepoint = this.takeSavepoint(graph, attemptID);
        StandaloneCompletedCheckpointStore checkpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, checkpointStore, (CheckpointIDCounter)checkpointIDCounter);
        checkpointCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath((String)savepoint.getExternalPointer(), (boolean)true, (RecoveryClaimMode)RecoveryClaimMode.NO_CLAIM), graph.getAllVertices(), this.getClass().getClassLoader());
        this.takeSavepoint(graph, attemptID, checkpointCoordinator, 2);
        checkpointCoordinator.startCheckpointScheduler();
        gateway.resetCount();
        CompletableFuture checkpoint = checkpointCoordinator.triggerCheckpoint(org.apache.flink.core.execution.CheckpointType.INCREMENTAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 3L), TASK_MANAGER_LOCATION_INFO);
        checkpoint.get();
        Assertions.assertThat((Object)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptID).checkpointOptions.getCheckpointType()).isEqualTo((Object)CheckpointType.FULL_CHECKPOINT);
    }

    private CompletedCheckpoint takeSavepoint(ExecutionGraph graph, ExecutionAttemptID attemptID) throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph, new StandaloneCompletedCheckpointStore(1), (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        CompletedCheckpoint savepoint = this.takeSavepoint(graph, attemptID, checkpointCoordinator, 1);
        checkpointCoordinator.shutdown();
        return savepoint;
    }

    private CompletedCheckpoint takeSavepoint(ExecutionGraph graph, ExecutionAttemptID attemptID, CheckpointCoordinator checkpointCoordinator, int savepointId) throws Exception {
        CompletableFuture savepointFuture = checkpointCoordinator.triggerSavepoint(TempDirUtils.newFolder((Path)this.temporaryFolder).getPath(), SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, (long)savepointId), TASK_MANAGER_LOCATION_INFO);
        return (CompletedCheckpoint)savepointFuture.get();
    }

    private CheckpointCoordinator createCheckpointCoordinator(ExecutionGraph graph, StandaloneCompletedCheckpointStore checkpointStore, CheckpointIDCounter checkpointIDCounter) throws Exception {
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10000L).setCheckpointTimeout(200000L).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build();
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(checkpointCoordinatorConfiguration).setCompletedCheckpointStore((CompletedCheckpointStore)checkpointStore).setCheckpointIDCounter(checkpointIDCounter).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testMinTimeBetweenCheckpointsInterval() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        long delay = 50L;
        long checkpointInterval = 12L;
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(12L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(50L).setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(checkpointCoordinatorConfiguration).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        try {
            checkpointCoordinator.startCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Long firstCallId = gateway.getTriggeredCheckpoints((ExecutionAttemptID)attemptID).get((int)0).checkpointId;
            Assertions.assertThat((Long)firstCallId).isOne();
            AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1L);
            long ackTime = System.nanoTime();
            checkpointCoordinator.receiveAcknowledgeMessage(ackMsg, TASK_MANAGER_LOCATION_INFO);
            gateway.resetCount();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            while (gateway.getTriggeredCheckpoints(attemptID).isEmpty()) {
                Thread.sleep(12L);
                this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Long nextCallId = gateway.getTriggeredCheckpoints((ExecutionAttemptID)attemptID).get((int)0).checkpointId;
            long nextCheckpointTime = System.nanoTime();
            Assertions.assertThat((Long)nextCallId).isEqualTo(2L);
            long delayMillis = (nextCheckpointTime - ackTime) / 1000000L;
            if (delayMillis + 1L < 50L) {
                Assertions.fail((String)("checkpoint came too early: delay was " + delayMillis + " but should have been at least 50"));
            }
        }
        finally {
            checkpointCoordinator.stopCheckpointScheduler();
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    void testStopPeriodicScheduler() throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            onCompletionPromise1.get();
            Assertions.fail((String)"The triggerCheckpoint call expected an exception");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            ((OptionalAssert)Assertions.assertThat((Optional)checkpointExceptionOptional).isPresent()).map(CheckpointException::getCheckpointFailureReason).get().isEqualTo((Object)CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
        }
        CompletableFuture onCompletionPromise2 = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((CompletableFuture)onCompletionPromise2).isNotCompletedExceptionally();
    }

    @Test
    void testWithNoOpCheckpointStatsTracker() throws Exception {
        ExecutionGraph executionGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID(), 2, 2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ((ExecutionVertex)executionGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().markFinished();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker((CheckpointStatsTracker)NoOpCheckpointStatsTracker.INSTANCE).setAllowCheckpointsAfterTasksFinished(true).build(executionGraph);
        this.triggerNonPeriodicCheckpoint(checkpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        ((AbstractIntegerAssert)Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).as("The PendingCheckpoint was added. The PendingCheckpointStats should have been initialized.", new Object[0])).isEqualTo(1);
    }

    @Test
    void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator();
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        checkpointCoordinator.shutdown();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            onCompletionPromise.get();
            Assertions.fail((String)"Should not reach here");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            ((OptionalAssert)Assertions.assertThat((Optional)checkpointExceptionOptional).isPresent()).map(CheckpointException::getCheckpointFailureReason).get().isEqualTo((Object)CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
        }
    }

    @Test
    void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph);
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).hasSize(1);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(onCompletionPromise1).isNotCompletedExceptionally();
        Assertions.assertThat(onCompletionPromise2).isNotCompletedExceptionally();
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isFalse();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
        Assertions.assertThat(gateway.getTriggeredCheckpoints(attemptID)).hasSize(2);
    }

    @Test
    void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(new UnstableCheckpointIDCounter(id -> id == 0L)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerNonPeriodicCheckpoint(checkpointCoordinator);
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = this.triggerNonPeriodicCheckpoint(checkpointCoordinator);
        CompletableFuture<CompletedCheckpoint> onCompletionPromise3 = this.triggerNonPeriodicCheckpoint(checkpointCoordinator);
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).hasSize(2);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(onCompletionPromise1).isCompletedExceptionally();
        Assertions.assertThat(onCompletionPromise2).isNotCompletedExceptionally();
        Assertions.assertThat(onCompletionPromise3).isNotCompletedExceptionally();
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isFalse();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
        Assertions.assertThat((int)gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(2);
    }

    @Test
    void testTriggerCheckpointRequestCancelled() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator(graph);
        CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<String>();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new TestingMasterHook(masterHookCheckpointFuture));
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.CheckpointCanceller.class);
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        try {
            onCompletionPromise.get();
            Assertions.fail((String)"Should not reach here");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            ((OptionalAssert)Assertions.assertThat((Optional)checkpointExceptionOptional).isPresent()).map(CheckpointException::getCheckpointFailureReason).get().isEqualTo((Object)CheckpointFailureReason.CHECKPOINT_EXPIRED);
        }
        Assertions.assertThat((int)checkpointCoordinator.getRecentExpiredCheckpoints().size()).isEqualTo(1);
        masterHookCheckpointFuture.complete("finish master hook");
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isFalse();
        Assertions.assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
    }

    @Test
    void testTriggerCheckpointInitializationFailed() throws Exception {
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(new UnstableCheckpointIDCounter(id -> id == 0L)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            onCompletionPromise1.get();
            Assertions.fail((String)"This checkpoint should fail through UnstableCheckpointIDCounter");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            Assertions.assertThat((boolean)checkpointExceptionOptional.isPresent()).isTrue();
            Assertions.assertThat((Comparable)((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason()).isEqualTo((Object)CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
        }
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isFalse();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(onCompletionPromise2).isNotCompletedExceptionally();
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isFalse();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
    }

    @Test
    void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.createCheckpointCoordinator();
        CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<String>();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new TestingMasterHook(masterHookCheckpointFuture));
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture<CompletedCheckpoint> onCompletionPromise = this.triggerPeriodicCheckpoint(checkpointCoordinator);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
        masterHookCheckpointFuture.completeExceptionally(new Exception("by design"));
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isFalse();
        try {
            onCompletionPromise.get();
            Assertions.fail((String)"Should not reach here");
        }
        catch (ExecutionException e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            ((OptionalAssert)Assertions.assertThat((Optional)checkpointExceptionOptional).isPresent()).map(CheckpointException::getCheckpointFailureReason).get().isEqualTo((Object)CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
        }
        Assertions.assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
        Assertions.assertThat((Collection)checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)new ScheduledExecutorServiceAdapter(scheduledExecutorService)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().build()).build(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService())).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()));
        CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<String>();
        OneShotLatch triggerCheckpointLatch = new OneShotLatch();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            CompletableFuture secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
            triggerCheckpointLatch.await();
            masterHookCheckpointFuture.complete("Completed");
            checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
            try {
                secondCheckpoint.get();
                Assertions.fail((String)"Expected the second checkpoint to fail.");
            }
            catch (ExecutionException ee) {
                Assertions.assertThat((Throwable)ExceptionUtils.stripExecutionException((Throwable)ee)).isInstanceOf(CheckpointException.class);
            }
        }
        catch (Throwable throwable) {
            checkpointCoordinator.shutdown();
            ExecutorUtils.gracefulShutdown((long)10L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{scheduledExecutorService});
            throw throwable;
        }
        checkpointCoordinator.shutdown();
        ExecutorUtils.gracefulShutdown((long)10L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{scheduledExecutorService});
    }

    private CheckpointCoordinator createCheckpointCoordinator() throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }

    private CheckpointCoordinator createCheckpointCoordinator(ExecutionGraph graph) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
    }

    private CompletableFuture<CompletedCheckpoint> triggerPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true);
    }

    private CompletableFuture<CompletedCheckpoint> triggerNonPeriodicCheckpoint(CheckpointCoordinator checkpointCoordinator) {
        return checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, false);
    }

    private static class UnstableCheckpointIDCounter
    implements CheckpointIDCounter {
        private final Predicate<Long> checkpointFailurePredicate;
        private long id = 0L;

        public UnstableCheckpointIDCounter(Predicate<Long> checkpointFailurePredicate) {
            this.checkpointFailurePredicate = (Predicate)Preconditions.checkNotNull(checkpointFailurePredicate);
        }

        public void start() {
        }

        public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
            return FutureUtils.completedVoidFuture();
        }

        public long getAndIncrement() {
            if (this.checkpointFailurePredicate.test(this.id++)) {
                throw new RuntimeException("CheckpointIDCounter#getAndIncrement fails by design");
            }
            return this.id;
        }

        public long get() {
            return this.id;
        }

        public void setCount(long newId) {
        }
    }

    private static class TestingMasterHook
    implements MasterTriggerRestoreHook<String> {
        private final SimpleVersionedSerializer<String> serializer = new CheckpointCoordinatorTestingUtils.StringSerializer();
        private final CompletableFuture<String> checkpointFuture;
        private final OneShotLatch triggerCheckpointLatch;

        private TestingMasterHook(CompletableFuture<String> checkpointFuture) {
            this(checkpointFuture, new OneShotLatch());
        }

        private TestingMasterHook(CompletableFuture<String> checkpointFuture, OneShotLatch triggerCheckpointLatch) {
            this.checkpointFuture = checkpointFuture;
            this.triggerCheckpointLatch = triggerCheckpointLatch;
        }

        public String getIdentifier() {
            return "testing master hook";
        }

        @Nullable
        public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
            this.triggerCheckpointLatch.trigger();
            return this.checkpointFuture;
        }

        public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) {
        }

        @Nullable
        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            return this.serializer;
        }
    }
}

