package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.taskmanager.TaskTest;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.TriFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.class */
public class CheckpointCoordinatorTest extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest$9, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$9.class */
    public static /* synthetic */ class AnonymousClass9 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode = new int[OperatorStateHandle.Mode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.UNION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.BROADCAST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$CheckpointIDCounterWithOwner.class */
    public static class CheckpointIDCounterWithOwner extends StandaloneCheckpointIDCounter {
        protected CheckpointCoordinator owner;

        private CheckpointIDCounterWithOwner() {
        }

        void setOwner(CheckpointCoordinator checkpointCoordinator) {
            this.owner = (CheckpointCoordinator) Preconditions.checkNotNull(checkpointCoordinator);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$IOExceptionCheckpointIDCounter.class */
    private static class IOExceptionCheckpointIDCounter extends CheckpointIDCounterWithOwner {
        private IOExceptionCheckpointIDCounter() {
            super();
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull(this.owner);
            throw new IOException("disk is error!");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$IOExceptionCheckpointStorage.class */
    private static class IOExceptionCheckpointStorage extends JobManagerCheckpointStorage {
        private IOExceptionCheckpointStorage() {
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobID) throws IOException {
            return new MemoryBackendCheckpointStorageAccess(jobID, null, null, 100) { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.IOExceptionCheckpointStorage.1
                public CheckpointStorageLocation initializeLocationForCheckpoint(long j) throws IOException {
                    throw new IOException("disk is error!");
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$StoppingCheckpointIDCounter.class */
    private static class StoppingCheckpointIDCounter extends CheckpointIDCounterWithOwner {
        private StoppingCheckpointIDCounter() {
            super();
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull(this.owner);
            this.owner.stopCheckpointScheduler();
            return super.getAndIncrement();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$TestFailJobCallback.class */
    public static class TestFailJobCallback implements CheckpointFailureManager.FailJobCallback {
        private int invokeCounter;

        private TestFailJobCallback() {
            this.invokeCounter = 0;
        }

        public void failJob(Throwable th) {
            this.invokeCounter++;
        }

        public void failJobDueToTaskFailure(Throwable th, ExecutionAttemptID executionAttemptID) {
            this.invokeCounter++;
        }

        public int getInvokeCounter() {
            return this.invokeCounter;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest$TestResetHook.class */
    private static class TestResetHook implements MasterTriggerRestoreHook<String> {
        private final String id;
        boolean resetCalled = false;

        TestResetHook(String str) {
            this.id = str;
        }

        public String getIdentifier() {
            return this.id;
        }

        public void reset() throws Exception {
            this.resetCalled = true;
        }

        public CompletableFuture<String> triggerCheckpoint(long j, long j2, Executor executor) {
            throw new UnsupportedOperationException();
        }

        public void restoreCheckpoint(long j, @Nullable String str) throws Exception {
            throw new UnsupportedOperationException();
        }

        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void testSharedStateNotDiscaredOnAbort() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        build2.startCheckpointScheduler();
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        triggerCheckpoint.getNow(null);
        TestingStreamStateHandle handle = handle();
        TestingStreamStateHandle handle2 = handle();
        TestingStreamStateHandle handle3 = handle();
        ackCheckpoint(1L, build2, jobVertexID, build, handle, handle2, handle3);
        declineCheckpoint(1L, build2, jobVertexID2, build);
        Assert.assertTrue(handle2.isDisposed());
        Assert.assertTrue(handle.isDisposed());
        Assert.assertFalse(handle3.isDisposed());
        CompletableFuture triggerCheckpoint2 = build2.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        triggerCheckpoint2.getNow(null);
        ackCheckpoint(2L, build2, jobVertexID, build, handle(), handle(), handle());
        ackCheckpoint(2L, build2, jobVertexID2, build, handle(), handle(), handle());
        triggerCheckpoint2.get();
        Assert.assertTrue(handle3.isDisposed());
    }

    @Test
    public void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
        testReportStatsAfterFailure(1L, (checkpointCoordinator, execution, checkpointMetrics) -> {
            checkpointCoordinator.reportStats(1L, execution.getAttemptId(), checkpointMetrics);
            return null;
        });
    }

    @Test
    public void testCheckpointStatsUpdatedAfterFailure() throws Exception {
        testReportStatsAfterFailure(1L, (checkpointCoordinator, execution, checkpointMetrics) -> {
            return Boolean.valueOf(checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(execution.getVertex().getJobId(), execution.getAttemptId(), 1L, checkpointMetrics, new TaskStateSnapshot()), TASK_MANAGER_LOCATION_INFO));
        });
    }

    private void testReportStatsAfterFailure(long j, TriFunctionWithException<CheckpointCoordinator, Execution, CheckpointMetrics, ?, CheckpointException> triFunctionWithException) throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(checkpointStatsTracker).build(build);
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Preconditions.checkState(build2.getNumberOfPendingCheckpoints() == 1, "wrong number of pending checkpoints: %s", new Object[]{Integer.valueOf(build2.getNumberOfPendingCheckpoints())});
        if (triggerCheckpoint.isDone()) {
            triggerCheckpoint.get();
        }
        build2.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), j, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "test");
        CheckpointMetrics build3 = new CheckpointMetricsBuilder().setTotalBytesPersisted(18L).setBytesPersistedOfThisCheckpoint(18L).setBytesProcessedDuringAlignment(19L).setAsyncDurationMillis(20L).setAlignmentDurationNanos(123000000L).setCheckpointStartDelayNanos(567000000L).build();
        triFunctionWithException.apply(build2, executionVertex2.getCurrentExecutionAttempt(), build3);
        assertStatsEqual(j, executionVertex2.getJobvertexId(), 0, build3, checkpointStatsTracker.createSnapshot().getHistory().getCheckpointById(j));
    }

    private boolean hasNoSubState(OperatorState operatorState) {
        return operatorState.getNumberCollectedStates() == 0;
    }

    private void assertStatsEqual(long j, JobVertexID jobVertexID, int i, CheckpointMetrics checkpointMetrics, AbstractCheckpointStats abstractCheckpointStats) {
        Assert.assertEquals(j, abstractCheckpointStats.getCheckpointId());
        Assert.assertEquals(CheckpointStatsStatus.FAILED, abstractCheckpointStats.getStatus());
        Assert.assertEquals(0L, abstractCheckpointStats.getNumberOfAcknowledgedSubtasks());
        assertStatsMetrics(jobVertexID, i, checkpointMetrics, abstractCheckpointStats);
    }

    public static void assertStatsMetrics(JobVertexID jobVertexID, int i, CheckpointMetrics checkpointMetrics, AbstractCheckpointStats abstractCheckpointStats) {
        Assert.assertEquals(checkpointMetrics.getTotalBytesPersisted(), abstractCheckpointStats.getStateSize());
        SubtaskStateStats subtaskStateStats = ((TaskStateStats) abstractCheckpointStats.getAllTaskStateStats().stream().filter(taskStateStats -> {
            return taskStateStats.getJobVertexId().equals(jobVertexID);
        }).findAny().get()).getSubtaskStats()[i];
        Assert.assertEquals(checkpointMetrics.getAlignmentDurationNanos() / 1000000, subtaskStateStats.getAlignmentDuration());
        Assert.assertEquals(Boolean.valueOf(checkpointMetrics.getUnalignedCheckpoint()), Boolean.valueOf(subtaskStateStats.getUnalignedCheckpoint()));
        Assert.assertEquals(checkpointMetrics.getAsyncDurationMillis(), subtaskStateStats.getAsyncCheckpointDuration());
        Assert.assertEquals(checkpointMetrics.getAlignmentDurationNanos() / 1000000, subtaskStateStats.getAlignmentDuration());
        Assert.assertEquals(checkpointMetrics.getCheckpointStartDelayNanos() / 1000000, subtaskStateStats.getCheckpointStartDelay());
    }

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testScheduleTriggerRequestDuringShutdown() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator((ScheduledExecutor) new ScheduledExecutorServiceAdapter(newSingleThreadScheduledExecutor));
        checkpointCoordinator.shutdown();
        newSingleThreadScheduledExecutor.shutdownNow();
        checkpointCoordinator.scheduleTriggerRequest();
    }

    @Test
    public void testMinCheckpointPause() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator checkpointCoordinator = null;
        try {
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService())).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
            ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
            checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(new ScheduledExecutorServiceAdapter(newSingleThreadScheduledExecutor)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(1000).setCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(1).setMinPauseBetweenCheckpoints(1000).build()).build(build);
            checkpointCoordinator.startCheckpointScheduler();
            checkpointCoordinator.triggerCheckpoint(true);
            checkpointCoordinator.triggerCheckpoint(true);
            while (checkpointCoordinator.getPendingCheckpoints().values().stream().noneMatch(pendingCheckpoint -> {
                return pendingCheckpoint.getCheckpointStorageLocation() != null;
            })) {
                Thread.sleep(10L);
            }
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
            Thread.sleep(1000 / 2);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            while (checkpointCoordinator.getNumberOfPendingCheckpoints() == 0) {
                Thread.sleep(1L);
            }
            if (checkpointCoordinator != null) {
                checkpointCoordinator.shutdown();
            }
            newSingleThreadScheduledExecutor.shutdownNow();
        } catch (Throwable th) {
            if (checkpointCoordinator != null) {
                checkpointCoordinator.shutdown();
            }
            newSingleThreadScheduledExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID(), false).setTransitToRunning(false).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(new JobVertexID(), false).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(build);
        Arrays.stream(build.getJobVertex(jobVertexID).getTaskVertices()).forEach(executionVertex -> {
            executionVertex.getCurrentExecutionAttempt().markFinished();
        });
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 256).addJobVertex(jobVertexID2, 3, 256).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex2 = build.getJobVertex(jobVertexID2);
        jobVertex.getTaskVertices()[0].getCurrentExecutionAttempt().markFinished();
        jobVertex.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
        jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).setCheckpointStatsTracker(checkpointStatsTracker).build(build);
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerCheckpoint.isDone());
        Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        AbstractCheckpointStats checkpointById = checkpointStatsTracker.createSnapshot().getHistory().getCheckpointById(((PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next()).getCheckpointID());
        Assert.assertEquals(3L, checkpointById.getNumberOfAcknowledgedSubtasks());
        for (ExecutionVertex executionVertex : Arrays.asList(jobVertex.getTaskVertices()[0], jobVertex.getTaskVertices()[1], jobVertex2.getTaskVertices()[1])) {
            Assert.assertNotNull(checkpointById.getTaskStateStats(executionVertex.getJobvertexId()).getSubtaskStats()[executionVertex.getParallelSubtaskIndex()]);
        }
    }

    @Test
    public void testTasksFinishDuringTriggering() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().setTransitToRunning(false).addJobVertex(jobVertexID, 1, 256).addJobVertex(jobVertexID2, 1, 256).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        final ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.1
            @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
            public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobID, long j, long j2, CheckpointOptions checkpointOptions) {
                executionVertex.getCurrentExecutionAttempt().markFinished();
                return FutureUtils.completedExceptionally(new RpcException(""));
            }
        }).createTestingLogicalSlot();
        TestingLogicalSlot createTestingLogicalSlot2 = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.2
            @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
            public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobID, long j, long j2, long j3) {
                atomicBoolean.set(true);
            }
        }).createTestingLogicalSlot();
        ExecutionGraphTestUtils.setVertexResource(executionVertex, createTestingLogicalSlot);
        executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        ExecutionGraphTestUtils.setVertexResource(executionVertex2, createTestingLogicalSlot2);
        executionVertex2.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).build(build);
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(build, getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!"));
        try {
            try {
                CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
                FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
                long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
                PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
                checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, longValue), TASK_MANAGER_LOCATION_INFO);
                Assert.assertFalse(pendingCheckpoint.isDisposed());
                Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
                checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId, longValue, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
                Assert.fail("Test failed.");
                checkpointCoordinator.shutdown();
            } catch (Exception e) {
                ExceptionUtils.assertThrowableWithMessage(e, "Exceeded checkpoint failure tolerance number!");
                checkpointCoordinator.shutdown();
            }
        } catch (Throwable th) {
            checkpointCoordinator.shutdown();
            throw th;
        }
    }

    @Test
    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()), getCheckpointFailureManager("Expected Error Message"));
        try {
            try {
                checkpointCoordinator.triggerCheckpoint(false);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
                checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.IO_EXCEPTION));
                Assert.fail("Test failed.");
                checkpointCoordinator.shutdown();
            } catch (Exception e) {
                ExceptionUtils.assertThrowableWithMessage(e, "Expected Error Message");
                checkpointCoordinator.shutdown();
            }
        } catch (Throwable th) {
            checkpointCoordinator.shutdown();
            throw th;
        }
    }

    @Test
    public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
        CompletableFuture triggerCheckpoint = setupCheckpointCoordinatorWithInactiveTasks(new IOExceptionCheckpointStorage()).triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            triggerCheckpoint.get();
            Assert.fail("should not trigger periodic checkpoint after IOException occurred.");
        } catch (Exception e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
            if (!findThrowable.isPresent() || ((CheckpointException) findThrowable.get()).getCheckpointFailureReason() != CheckpointFailureReason.IO_EXCEPTION) {
                throw e;
            }
        }
    }

    @Test
    public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
        TestFailJobCallback testFailJobCallback = new TestFailJobCallback();
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
        testTriggerCheckpoint(new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointStatsTracker(checkpointStatsTracker).setFailureManager(new CheckpointFailureManager(0, testFailJobCallback)).setCheckpointStorage(new IOExceptionCheckpointStorage()).setTimer(this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()), CheckpointFailureReason.IO_EXCEPTION);
        Assert.assertEquals(1L, testFailJobCallback.getInvokeCounter());
        Assert.assertNotNull(checkpointStatsTracker.getPendingCheckpointStats(1L));
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(new JobVertexID(), false).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointStorage(new IOExceptionCheckpointStorage()).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        Arrays.stream(build.getJobVertex(jobVertexID).getTaskVertices()).forEach(executionVertex -> {
            executionVertex.getCurrentExecutionAttempt().markFinished();
        });
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        build2.startCheckpointScheduler();
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        build2.shutdown();
    }

    @Test
    public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()), getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!"));
        try {
            try {
                checkpointCoordinator.triggerCheckpoint(false);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
                checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
                Assert.fail("Test failed.");
                checkpointCoordinator.shutdown();
            } catch (Exception e) {
                ExceptionUtils.assertThrowableWithMessage(e, "Exceeded checkpoint failure tolerance number!");
                checkpointCoordinator.shutdown();
            }
        } catch (Throwable th) {
            checkpointCoordinator.shutdown();
            throw th;
        }
    }

    @Test
    public void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
        testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_DECLINED);
    }

    @Test
    public void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
        testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
    }

    private void testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason checkpointFailureReason) throws Exception {
        CheckpointException checkpointException = new CheckpointException(checkpointFailureReason);
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointFailureManager(new CheckpointFailureManager(0, new TestFailJobCallback())).build(build);
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(1L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long longValue = ((Long) ((Map.Entry) build2.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().get(Long.valueOf(longValue));
        Assert.assertNotNull(pendingCheckpoint);
        Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
        Assert.assertEquals(build.getJobID(), pendingCheckpoint.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Iterator it = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it.hasNext()) {
            CheckpointCoordinatorTestingUtils.TriggeredCheckpoint onlyTriggeredCheckpoint = checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId());
            Assert.assertEquals(longValue, onlyTriggeredCheckpoint.checkpointId);
            Assert.assertEquals(pendingCheckpoint.getCheckpointTimestamp(), onlyTriggeredCheckpoint.timestamp);
            Assert.assertEquals(CheckpointOptions.forCheckpointWithDefaultLocation(), onlyTriggeredCheckpoint.checkpointOptions);
        }
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, longValue), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, longValue), TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        build2.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId, longValue, checkpointException), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        build2.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId, longValue, checkpointException), TASK_MANAGER_LOCATION_INFO);
        build2.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId2, longValue, checkpointException), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Assert.assertEquals(1L, r0.getInvokeCounter());
        build2.shutdown();
    }

    @Test
    public void testTriggerAndDeclineCheckpointComplex() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(build);
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        CompletableFuture triggerCheckpoint2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint2);
        Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(2L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        Iterator it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
        long longValue = ((Long) ((Map.Entry) it.next()).getKey()).longValue();
        long longValue2 = ((Long) ((Map.Entry) it.next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
        PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue2));
        Assert.assertNotNull(pendingCheckpoint);
        Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
        Assert.assertEquals(build.getJobID(), pendingCheckpoint.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Assert.assertNotNull(pendingCheckpoint2);
        Assert.assertEquals(longValue2, pendingCheckpoint2.getCheckpointId());
        Assert.assertEquals(build.getJobID(), pendingCheckpoint2.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint2.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint2.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint2.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint2.isDisposed());
        Assert.assertFalse(pendingCheckpoint2.areTasksFullyAcknowledged());
        Iterator it2 = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it2.hasNext()) {
            List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> triggeredCheckpoints = checkpointRecorderTaskManagerGateway.getTriggeredCheckpoints(((ExecutionVertex) it2.next()).getCurrentExecutionAttempt().getAttemptId());
            Assert.assertEquals(2L, triggeredCheckpoints.size());
            Assert.assertEquals(longValue, triggeredCheckpoints.get(0).checkpointId);
            Assert.assertEquals(longValue2, triggeredCheckpoints.get(1).checkpointId);
        }
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId, longValue, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Iterator it3 = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(longValue, checkpointRecorderTaskManagerGateway.getOnlyNotifiedAbortedCheckpoint(((ExecutionVertex) it3.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(1L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long longValue3 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint3 = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue3));
        Assert.assertEquals(longValue2, longValue3);
        Assert.assertNotNull(pendingCheckpoint3);
        Assert.assertEquals(longValue3, pendingCheckpoint3.getCheckpointId());
        Assert.assertEquals(build.getJobID(), pendingCheckpoint3.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint3.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint3.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint3.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint3.isDisposed());
        Assert.assertFalse(pendingCheckpoint3.areTasksFullyAcknowledged());
        Assert.assertNotEquals(pendingCheckpoint.getCheckpointId(), pendingCheckpoint3.getCheckpointId());
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId, longValue, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId2, longValue, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Iterator it4 = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it4.hasNext()) {
            Assert.assertEquals(1L, checkpointRecorderTaskManagerGateway.getNotifiedAbortedCheckpoints(((ExecutionVertex) it4.next()).getCurrentExecutionAttempt().getAttemptId()).size());
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(build);
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(1L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
        Assert.assertNotNull(pendingCheckpoint);
        Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
        Assert.assertEquals(build.getJobID(), pendingCheckpoint.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Iterator it = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(longValue, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        OperatorID generatedOperatorID = ((OperatorIDPair) executionVertex.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID generatedOperatorID2 = ((OperatorIDPair) executionVertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(Collections.singletonMap(generatedOperatorID, operatorSubtaskState));
        AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(build.getJobID(), attemptId2, longValue, new CheckpointMetrics(), new TaskStateSnapshot(Collections.singletonMap(generatedOperatorID2, operatorSubtaskState2)));
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class), Mockito.eq(longValue));
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(2))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class), Mockito.eq(longValue));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class), Mockito.eq(longValue));
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(2))).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class), Mockito.eq(longValue));
        Iterator it2 = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(longValue, checkpointRecorderTaskManagerGateway.getOnlyNotifiedCompletedCheckpoint(((ExecutionVertex) it2.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals(build.getJobID(), completedCheckpoint.getJobId());
        Assert.assertEquals(pendingCheckpoint.getCheckpointId(), completedCheckpoint.getCheckpointID());
        Assert.assertEquals(2L, completedCheckpoint.getOperatorStates().size());
        checkpointRecorderTaskManagerGateway.resetCount();
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long longValue2 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, longValue2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, longValue2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals(build.getJobID(), completedCheckpoint2.getJobId());
        Assert.assertEquals(longValue2, completedCheckpoint2.getCheckpointID());
        Assert.assertEquals(2L, completedCheckpoint2.getOperatorStates().size());
        Assert.assertTrue(completedCheckpoint2.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
        Iterator it3 = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it3.hasNext()) {
            ExecutionAttemptID attemptId3 = ((ExecutionVertex) it3.next()).getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals(longValue2, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(attemptId3).checkpointId);
            Assert.assertEquals(longValue2, checkpointRecorderTaskManagerGateway.getOnlyNotifiedCompletedCheckpoint(attemptId3).checkpointId);
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testMultipleConcurrentCheckpoints() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).addJobVertex(jobVertexID3, false).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionVertex executionVertex3 = build.getJobVertex(jobVertexID3).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId3 = executionVertex3.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        Iterator it = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(checkpointId, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, checkpointId), TASK_MANAGER_LOCATION_INFO);
        checkpointRecorderTaskManagerGateway.resetCount();
        CompletableFuture triggerCheckpoint2 = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint2);
        Assert.assertEquals(2L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        Iterator it2 = build2.getPendingCheckpoints().values().iterator();
        PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it2.next();
        PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it2.next() : pendingCheckpoint2;
        long checkpointId2 = pendingCheckpoint3.getCheckpointId();
        Iterator it3 = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(checkpointId2, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(((ExecutionVertex) it3.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId3, checkpointId), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, checkpointId), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Iterator it4 = Arrays.asList(executionVertex, executionVertex2, executionVertex3).iterator();
        while (it4.hasNext()) {
            Assert.assertEquals(checkpointId, checkpointRecorderTaskManagerGateway.getOnlyNotifiedCompletedCheckpoint(((ExecutionVertex) it4.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        checkpointRecorderTaskManagerGateway.resetCount();
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(2L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertTrue(pendingCheckpoint3.isDisposed());
        Iterator it5 = Arrays.asList(executionVertex, executionVertex2, executionVertex3).iterator();
        while (it5.hasNext()) {
            Assert.assertEquals(checkpointId2, checkpointRecorderTaskManagerGateway.getOnlyNotifiedCompletedCheckpoint(((ExecutionVertex) it5.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        List successfulCheckpoints = build2.getSuccessfulCheckpoints();
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) successfulCheckpoints.get(0);
        Assert.assertEquals(checkpointId, completedCheckpoint.getCheckpointID());
        Assert.assertEquals(build.getJobID(), completedCheckpoint.getJobId());
        Assert.assertEquals(3L, completedCheckpoint.getOperatorStates().size());
        Assert.assertTrue(completedCheckpoint.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
        CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) successfulCheckpoints.get(1);
        Assert.assertEquals(checkpointId2, completedCheckpoint2.getCheckpointID());
        Assert.assertEquals(build.getJobID(), completedCheckpoint2.getJobId());
        Assert.assertEquals(3L, completedCheckpoint2.getOperatorStates().size());
        Assert.assertTrue(completedCheckpoint2.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
        build2.shutdown();
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).addJobVertex(jobVertexID3, false).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionVertex executionVertex3 = build.getJobVertex(jobVertexID3).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId3 = executionVertex3.getCurrentExecutionAttempt().getAttemptId();
        CompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(10);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore(standaloneCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        Iterator it = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(checkpointId, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        OperatorID generatedOperatorID = ((OperatorIDPair) executionVertex.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID generatedOperatorID2 = ((OperatorIDPair) executionVertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID generatedOperatorID3 = ((OperatorIDPair) executionVertex3.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        TaskStateSnapshot taskStateSnapshot3 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState3 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        taskStateSnapshot.putSubtaskStateByOperatorID(generatedOperatorID, operatorSubtaskState);
        taskStateSnapshot2.putSubtaskStateByOperatorID(generatedOperatorID2, operatorSubtaskState2);
        taskStateSnapshot3.putSubtaskStateByOperatorID(generatedOperatorID3, operatorSubtaskState3);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, checkpointId, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
        checkpointRecorderTaskManagerGateway.resetCount();
        CompletableFuture triggerCheckpoint2 = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint2);
        Assert.assertEquals(2L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        Iterator it2 = build2.getPendingCheckpoints().values().iterator();
        PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it2.next();
        PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it2.next() : pendingCheckpoint2;
        long checkpointId2 = pendingCheckpoint3.getCheckpointId();
        TaskStateSnapshot taskStateSnapshot4 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        TaskStateSnapshot taskStateSnapshot5 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        TaskStateSnapshot taskStateSnapshot6 = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        OperatorSubtaskState operatorSubtaskState4 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState5 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState6 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        taskStateSnapshot4.putSubtaskStateByOperatorID(generatedOperatorID, operatorSubtaskState4);
        taskStateSnapshot5.putSubtaskStateByOperatorID(generatedOperatorID2, operatorSubtaskState5);
        taskStateSnapshot6.putSubtaskStateByOperatorID(generatedOperatorID3, operatorSubtaskState6);
        Iterator it3 = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(checkpointId2, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(((ExecutionVertex) it3.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId3, checkpointId2, new CheckpointMetrics(), taskStateSnapshot6), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, checkpointId2, new CheckpointMetrics(), taskStateSnapshot4), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, checkpointId, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, checkpointId2, new CheckpointMetrics(), taskStateSnapshot5), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Assert.assertTrue(pendingCheckpoint3.isDisposed());
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState2, Mockito.times(1))).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState4, Mockito.never())).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState5, Mockito.never())).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState6, Mockito.never())).discardState();
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) build2.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals(checkpointId2, completedCheckpoint.getCheckpointID());
        Assert.assertEquals(build.getJobID(), completedCheckpoint.getJobId());
        Assert.assertEquals(3L, completedCheckpoint.getOperatorStates().size());
        Iterator it4 = Arrays.asList(executionVertex, executionVertex2, executionVertex3).iterator();
        while (it4.hasNext()) {
            Assert.assertEquals(checkpointId2, checkpointRecorderTaskManagerGateway.getOnlyNotifiedCompletedCheckpoint(((ExecutionVertex) it4.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId3, checkpointId, new CheckpointMetrics(), taskStateSnapshot3), TASK_MANAGER_LOCATION_INFO);
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState3, Mockito.times(1))).discardState();
        build2.shutdown();
        standaloneCompletedCheckpointStore.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState4, Mockito.times(1))).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState5, Mockito.times(1))).discardState();
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState6, Mockito.times(1))).discardState();
    }

    @Test
    public void testCheckpointTimeoutIsolated() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2, false).setTaskManagerGateway(new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next();
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        OperatorID generatedOperatorID = ((OperatorIDPair) executionVertex.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        taskStateSnapshot.putSubtaskStateByOperatorID(generatedOperatorID, operatorSubtaskState);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, pendingCheckpoint.getCheckpointId(), new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        this.manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
        Assert.assertTrue("Checkpoint was not canceled by the timeout", pendingCheckpoint.isDisposed());
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
        Iterator it = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(0L, r0.getNotifiedCompletedCheckpoints(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId()).size());
        }
        build2.shutdown();
    }

    @Test
    public void testHandleMessagesForNonExistingCheckpoints() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2, false).setTaskManagerGateway(new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        long longValue = ((Long) build2.getPendingCheckpoints().keySet().iterator().next()).longValue();
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), attemptId, longValue), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), longValue), TASK_MANAGER_LOCATION_INFO);
        build2.shutdown();
    }

    @Test
    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2, false).setTaskManagerGateway(new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        OperatorID generatedOperatorID = ((OperatorIDPair) executionVertex.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        taskStateSnapshot.putSubtaskStateByOperatorID(generatedOperatorID, operatorSubtaskState);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, checkpointId, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot2 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot2, Mockito.times(1))).discardState();
        TaskStateSnapshot taskStateSnapshot3 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot3), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot3, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot4 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, checkpointId, new CheckpointMetrics(), taskStateSnapshot4), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot4, Mockito.never())).discardState();
        Mockito.reset(new OperatorSubtaskState[]{operatorSubtaskState});
        build2.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), attemptId, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.times(1))).discardState();
        TaskStateSnapshot taskStateSnapshot5 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, checkpointId, new CheckpointMetrics(), taskStateSnapshot5), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot5, Mockito.times(1))).discardState();
        Mockito.reset(new TaskStateSnapshot[]{taskStateSnapshot3});
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot3), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot3, Mockito.never())).discardState();
        TaskStateSnapshot taskStateSnapshot6 = (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class);
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot6), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot) Mockito.verify(taskStateSnapshot6, Mockito.times(1))).discardState();
    }

    @Test
    public void testMaxConcurrentAttempts1() {
        testMaxConcurrentAttempts(1);
    }

    @Test
    public void testMaxConcurrentAttempts2() {
        testMaxConcurrentAttempts(2);
    }

    @Test
    public void testMaxConcurrentAttempts5() {
        testMaxConcurrentAttempts(5);
    }

    @Test
    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(checkpointStatsTracker).build(build);
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        CompletableFuture triggerSavepoint = build2.triggerSavepoint(this.tmpFolder.newFolder().getAbsolutePath(), SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse(triggerSavepoint.isDone());
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        long longValue = ((Long) ((Map.Entry) build2.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().get(Long.valueOf(longValue));
        Assert.assertNotNull(pendingCheckpoint);
        Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
        Assert.assertEquals(build.getJobID(), pendingCheckpoint.getJobId());
        Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(0L, pendingCheckpoint.getOperatorStates().size());
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Assert.assertFalse(pendingCheckpoint.canBeSubsumed());
        OperatorID fromJobVertexID = OperatorID.fromJobVertexID(executionVertex.getJobvertexId());
        OperatorID fromJobVertexID2 = OperatorID.fromJobVertexID(executionVertex2.getJobvertexId());
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState operatorSubtaskState2 = (OperatorSubtaskState) Mockito.mock(OperatorSubtaskState.class);
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(Collections.singletonMap(fromJobVertexID, operatorSubtaskState));
        AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(build.getJobID(), attemptId2, longValue, new CheckpointMetrics(), new TaskStateSnapshot(Collections.singletonMap(fromJobVertexID2, operatorSubtaskState2)));
        build2.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Assert.assertFalse(triggerSavepoint.isDone());
        build2.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        Assert.assertFalse(pendingCheckpoint.areTasksFullyAcknowledged());
        Assert.assertFalse(triggerSavepoint.isDone());
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        Assert.assertNotNull(triggerSavepoint.get());
        Assert.assertEquals(0L, build2.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, build2.getNumberOfPendingCheckpoints());
        Iterator it = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it.hasNext()) {
            ExecutionAttemptID attemptId3 = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals(longValue, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(attemptId3).checkpointId);
            Assertions.assertThat(checkpointRecorderTaskManagerGateway.getNotifiedCompletedCheckpoints(attemptId3)).isEmpty();
        }
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) triggerSavepoint.get();
        Assert.assertEquals(build.getJobID(), completedCheckpoint.getJobId());
        Assert.assertEquals(pendingCheckpoint.getCheckpointId(), completedCheckpoint.getCheckpointID());
        Assert.assertEquals(2L, completedCheckpoint.getOperatorStates().size());
        AbstractCheckpointStats checkpointById = checkpointStatsTracker.createSnapshot().getHistory().getCheckpointById(longValue);
        Assert.assertEquals(longValue, checkpointById.getCheckpointId());
        Assert.assertEquals(CheckpointStatsStatus.COMPLETED, checkpointById.getStatus());
        build2.shutdown();
    }

    @Test
    public void testSavepointsAreNotSubsumed() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = (CheckpointCoordinator) Mockito.spy(new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCheckpointIDCounter(standaloneCheckpointIDCounter).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(1)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build));
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture triggerSavepoint = checkpointCoordinator.triggerSavepoint(absolutePath, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long last = standaloneCheckpointIDCounter.getLast();
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        CompletableFuture triggerCheckpoint2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint2);
        long last2 = standaloneCheckpointIDCounter.getLast();
        Assert.assertEquals(3L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, last2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, last2), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator) Mockito.verify(checkpointCoordinator, Mockito.times(1))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq(last2), ArgumentMatchers.anyLong(), Mockito.eq(-1L));
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse(((PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(last))).isDisposed());
        Assert.assertFalse(triggerSavepoint.isDone());
        CompletableFuture triggerCheckpoint3 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint3);
        Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        CompletableFuture triggerSavepoint2 = checkpointCoordinator.triggerSavepoint(absolutePath, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long last3 = standaloneCheckpointIDCounter.getLast();
        FutureUtils.throwIfCompletedExceptionally(triggerSavepoint2);
        Assert.assertEquals(3L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, last3), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, last3), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator) Mockito.verify(checkpointCoordinator, Mockito.times(0))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq(last3), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse(((PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(last))).isDisposed());
        Assert.assertFalse(triggerSavepoint.isDone());
        Assert.assertNotNull(triggerSavepoint2.get());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, last), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, last), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator) Mockito.verify(checkpointCoordinator, Mockito.times(0))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq(last), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertNotNull(triggerSavepoint.get());
        CompletableFuture triggerCheckpoint4 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint4);
        long last4 = standaloneCheckpointIDCounter.getLast();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, last4), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId2, last4), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator) Mockito.verify(checkpointCoordinator, Mockito.times(1))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq(last4), ArgumentMatchers.anyLong(), Mockito.eq(last2));
    }

    private void testMaxConcurrentAttempts(int i) {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTaskManagerGateway(new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
            ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
            CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(i).build()).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
            build2.startCheckpointScheduler();
            for (int i2 = 0; i2 < i; i2++) {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Assert.assertEquals(i, r0.getTriggeredCheckpoints(attemptId).size());
            Assert.assertEquals(0L, r0.getNotifiedCompletedCheckpoints(attemptId).size());
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals(1L, this.manuallyTriggeredScheduledExecutor.getActivePeriodicScheduledTask().size());
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(i + 1, r0.getTriggeredCheckpoints(attemptId).size());
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals(i + 1, r0.getTriggeredCheckpoints(attemptId).size());
            build2.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build()).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        build2.startCheckpointScheduler();
        do {
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
        } while (build2.getNumberOfPendingCheckpoints() < 2);
        Assert.assertEquals(2L, build2.getNumberOfPendingCheckpoints());
        Assert.assertNotNull(build2.getPendingCheckpoints().get(1L));
        Assert.assertNotNull(build2.getPendingCheckpoints().get(2L));
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, 2L), TASK_MANAGER_LOCATION_INFO);
        do {
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
        } while (build2.getNumberOfPendingCheckpoints() < 2);
        Assert.assertEquals(2L, build2.getNumberOfPendingCheckpoints());
        Assert.assertNotNull(build2.getPendingCheckpoints().get(3L));
        Assert.assertNotNull(build2.getPendingCheckpoints().get(4L));
        build2.shutdown();
    }

    @Test
    public void testPeriodicSchedulingWithInactiveTasks() throws Exception {
        CheckpointCoordinator checkpointCoordinator = setupCheckpointCoordinatorWithInactiveTasks(new MemoryStateBackend());
        this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
    }

    private CheckpointCoordinator setupCheckpointCoordinatorWithInactiveTasks(CheckpointStorage checkpointStorage) throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTransitToRunning(false).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        CheckpointCoordinatorConfiguration build2 = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build();
        CheckpointIDCounter checkpointIDCounterWithOwner = new CheckpointIDCounterWithOwner();
        CheckpointCoordinator build3 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(build2).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setCheckpointStorage(checkpointStorage).setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointIDCounter(checkpointIDCounterWithOwner).build(build);
        checkpointIDCounterWithOwner.setOwner(build3);
        build3.startCheckpointScheduler();
        this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(0L, build3.getNumberOfPendingCheckpoints());
        executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        return build3;
    }

    @Test
    public void testConcurrentSavepoints() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build()).setCheckpointIDCounter(standaloneCheckpointIDCounter).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        ArrayList arrayList = new ArrayList();
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        for (int i = 0; i < 5; i++) {
            arrayList.add(build2.triggerSavepoint(absolutePath, SavepointFormatType.CANONICAL));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((CompletableFuture) it.next()).isDone());
        }
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long last = standaloneCheckpointIDCounter.getLast();
        int i2 = 0;
        while (i2 < 5) {
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, last), TASK_MANAGER_LOCATION_INFO);
            i2++;
            last--;
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertNotNull(((CompletableFuture) it2.next()).get());
        }
    }

    @Test
    public void testMinDelayBetweenSavepoints() throws Exception {
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMinPauseBetweenCheckpoints(100000000L).setMaxConcurrentCheckpoints(1).build()).setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        Assert.assertFalse("Did not trigger savepoint", build.triggerSavepoint(absolutePath, SavepointFormatType.CANONICAL).isDone());
        Assert.assertFalse("Did not trigger savepoint", build.triggerSavepoint(absolutePath, SavepointFormatType.CANONICAL).isDone());
    }

    @Test
    public void testExternalizedCheckpoints() throws Exception {
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Iterator it = build.getPendingCheckpoints().values().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), ((PendingCheckpoint) it.next()).getProps());
        }
        build.shutdown();
    }

    @Test
    public void testCreateKeyGroupPartitions() {
        testCreateKeyGroupPartitions(1, 1);
        testCreateKeyGroupPartitions(13, 1);
        testCreateKeyGroupPartitions(13, 2);
        testCreateKeyGroupPartitions(32767, 1);
        testCreateKeyGroupPartitions(32767, 13);
        testCreateKeyGroupPartitions(32767, 32767);
        Random random = new Random(1234L);
        for (int i = 0; i < 1000; i++) {
            int nextInt = 1 + random.nextInt(32766);
            testCreateKeyGroupPartitions(nextInt, 1 + random.nextInt(nextInt));
        }
    }

    private void testCreateKeyGroupPartitions(int i, int i2) {
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(i, i2);
        for (int i3 = 0; i3 < i; i3++) {
            KeyGroupRange keyGroupRange = (KeyGroupRange) createKeyGroupPartitions.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(i, i2, i3));
            if (!keyGroupRange.contains(i3)) {
                Assert.fail("Could not find expected key-group " + i3 + " in range " + keyGroupRange);
            }
        }
    }

    @Test
    public void testPartitionableStateRepartitioning() {
        Random random = new Random(42L);
        for (int i = 0; i < 10000; i++) {
            doTestPartitionableStateRepartitioning(random, 1 + random.nextInt(9), 1 + random.nextInt(9), 1 + random.nextInt(9), 1 + random.nextInt(9));
        }
    }

    private void doTestPartitionableStateRepartitioning(Random random, int i, int i2, int i3, int i4) {
        int i5;
        ArrayList<List> arrayList = new ArrayList(i);
        for (int i6 = 0; i6 < i; i6++) {
            Path path = new Path("/fake-" + i6);
            HashMap hashMap = new HashMap();
            int i7 = 0;
            for (int i8 = 0; i8 < i3 - 1; i8++) {
                long[] jArr = new long[1 + random.nextInt(i4)];
                for (int i9 = 0; i9 < jArr.length; i9++) {
                    jArr[i9] = i7;
                    i7++;
                }
                hashMap.put("State-" + i8, new OperatorStateHandle.StateMetaInfo(jArr, random.nextInt(10) == 0 ? OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            }
            if (i3 % 2 == 0) {
                hashMap.put("State-" + (i3 - 1), new OperatorStateHandle.StateMetaInfo(new long[]{i7 + 1, i7 + 2, i7 + 3, i7 + 4}, OperatorStateHandle.Mode.BROADCAST));
            }
            arrayList.add(Collections.singletonList(new OperatorStreamStateHandle(hashMap, new FileStateHandle(path, -1L))));
        }
        HashMap hashMap2 = new HashMap();
        int i10 = 0;
        int i11 = 0;
        for (List<OperatorStateHandle> list : arrayList) {
            Assert.assertEquals(1L, list.size());
            for (OperatorStateHandle operatorStateHandle : list) {
                Map stateNameToPartitionOffsets = operatorStateHandle.getStateNameToPartitionOffsets();
                HashMap hashMap3 = new HashMap(stateNameToPartitionOffsets.size());
                for (Map.Entry entry : stateNameToPartitionOffsets.entrySet()) {
                    long[] offsets = ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getOffsets();
                    switch (AnonymousClass9.$SwitchMap$org$apache$flink$runtime$state$OperatorStateHandle$Mode[((OperatorStateHandle.StateMetaInfo) entry.getValue()).getDistributionMode().ordinal()]) {
                        case PendingCheckpointTest.PARALLELISM /* 1 */:
                            i5 = i2;
                            break;
                        case TaskTest.InvokableDeclingingCheckpoints.REJECTED_EXECUTION_CHECKPOINT_ID /* 2 */:
                            i5 = (i2 / i) + (i10 < i2 % i ? 1 : 0);
                            break;
                        case TaskTest.InvokableDeclingingCheckpoints.THROWING_CHECKPOINT_ID /* 3 */:
                            i5 = 1;
                            break;
                        default:
                            throw new RuntimeException("Unknown distribution mode " + ((OperatorStateHandle.StateMetaInfo) entry.getValue()).getDistributionMode());
                    }
                    if (i5 > 0) {
                        i11 += i5 * offsets.length;
                        ArrayList arrayList2 = new ArrayList(offsets.length);
                        for (long j : offsets) {
                            for (int i12 = 0; i12 < i5; i12++) {
                                arrayList2.add(Long.valueOf(j));
                            }
                        }
                        hashMap3.put(entry.getKey(), arrayList2);
                    }
                }
                if (!hashMap3.isEmpty()) {
                    hashMap2.put(operatorStateHandle.getDelegateStateHandle(), hashMap3);
                }
                i10++;
            }
        }
        List repartitionState = RoundRobinOperatorStateRepartitioner.INSTANCE.repartitionState(arrayList, i, i2);
        HashMap hashMap4 = new HashMap();
        int i13 = Integer.MAX_VALUE;
        int i14 = 0;
        int i15 = 0;
        for (int i16 = 0; i16 < i2; i16++) {
            int i17 = 0;
            for (OperatorStateHandle operatorStateHandle2 : (Collection) repartitionState.get(i16)) {
                for (Map.Entry entry2 : operatorStateHandle2.getStateNameToPartitionOffsets().entrySet()) {
                    Map map = (Map) hashMap4.get(operatorStateHandle2.getDelegateStateHandle());
                    if (map == null) {
                        map = new HashMap();
                        hashMap4.put(operatorStateHandle2.getDelegateStateHandle(), map);
                    }
                    List list2 = (List) map.get(entry2.getKey());
                    if (list2 == null) {
                        list2 = new ArrayList();
                        map.put(entry2.getKey(), list2);
                    }
                    for (long j2 : ((OperatorStateHandle.StateMetaInfo) entry2.getValue()).getOffsets()) {
                        list2.add(Long.valueOf(j2));
                    }
                    i17 += ((OperatorStateHandle.StateMetaInfo) entry2.getValue()).getOffsets().length;
                }
            }
            i13 = Math.min(i13, i17);
            i14 = Math.max(i14, i17);
            i15 += i17;
        }
        Iterator it = hashMap4.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Map) it.next()).values().iterator();
            while (it2.hasNext()) {
                Collections.sort((List) it2.next());
            }
        }
        if (i != i2) {
            int i18 = i14 - i13;
            Assert.assertTrue("Difference in partition load is > 1 : " + i18, i18 <= 1);
        }
        Assert.assertEquals(i11, i15);
        Assert.assertEquals(hashMap2, hashMap4);
    }

    @Test
    public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
        CheckpointStatsTracker checkpointStatsTracker = (CheckpointStatsTracker) Mockito.mock(CheckpointStatsTracker.class);
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(checkpointStatsTracker).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Mockito.when(checkpointStatsTracker.reportPendingCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointProperties) Matchers.any(CheckpointProperties.class), (Map) Matchers.any(Map.class))).thenReturn(Mockito.mock(PendingCheckpointStats.class));
        CompletableFuture triggerCheckpoint = build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        ((CheckpointStatsTracker) Mockito.verify(checkpointStatsTracker, Mockito.times(1))).reportPendingCheckpoint(Mockito.eq(1L), ((Long) Matchers.any(Long.class)).longValue(), (CheckpointProperties) Mockito.eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)), (Map) Matchers.any());
    }

    @Test
    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
        CompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        CheckpointStatsTracker checkpointStatsTracker = (CheckpointStatsTracker) Mockito.mock(CheckpointStatsTracker.class);
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(standaloneCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(checkpointStatsTracker).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        standaloneCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(new CompletedCheckpoint(new JobID(), 0L, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation(), (CompletedCheckpointStats) null), new CheckpointsCleaner(), () -> {
        });
        Assert.assertTrue(build.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
        ((CheckpointStatsTracker) Mockito.verify(checkpointStatsTracker, Mockito.times(1))).reportRestoredCheckpoint((RestoredCheckpointStats) Matchers.any(RestoredCheckpointStats.class));
    }

    @Test
    public void testSharedStateRegistrationOnRestore() throws Exception {
        for (RestoreMode restoreMode : RestoreMode.values()) {
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 2, 4).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
            ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
            List emptyList = Collections.emptyList();
            SharedStateRegistry create = SharedStateRegistry.DEFAULT_FACTORY.create(org.apache.flink.util.concurrent.Executors.directExecutor(), emptyList, restoreMode);
            CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore(10, emptyList, create);
            CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder timer = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor);
            CheckpointCoordinator build2 = timer.setCompletedCheckpointStore(embeddedCompletedCheckpointStore).build(build);
            List<KeyGroupRange> createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(4, 2);
            for (int i = 0; i < 3; i++) {
                performIncrementalCheckpoint(build.getJobID(), build2, jobVertex, createKeyGroupPartitions, i);
            }
            List<CompletedCheckpoint> successfulCheckpoints = build2.getSuccessfulCheckpoints();
            Assert.assertEquals(3L, successfulCheckpoints.size());
            int i2 = 0;
            ArrayList arrayList = new ArrayList(3);
            for (int i3 = 0; i3 < 3; i3++) {
                arrayList.add(new HashMap(2));
            }
            int i4 = 0;
            for (CompletedCheckpoint completedCheckpoint : successfulCheckpoints) {
                Iterator it = completedCheckpoint.getOperatorStates().values().iterator();
                while (it.hasNext()) {
                    for (OperatorSubtaskState operatorSubtaskState : ((OperatorState) it.next()).getStates()) {
                        Iterator it2 = operatorSubtaskState.getManagedKeyedState().iterator();
                        while (it2.hasNext()) {
                            IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = (KeyedStateHandle) it2.next();
                            ((KeyedStateHandle) Mockito.verify(incrementalRemoteKeyedStateHandle, Mockito.times(1))).registerSharedStates(create, completedCheckpoint.getCheckpointID());
                            IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle2 = incrementalRemoteKeyedStateHandle;
                            ((Map) arrayList.get(i4)).putAll(incrementalRemoteKeyedStateHandle2.getSharedState());
                            for (StreamStateHandle streamStateHandle : incrementalRemoteKeyedStateHandle2.getSharedState().values()) {
                                Assert.assertTrue(!(streamStateHandle instanceof PlaceholderStreamStateHandle));
                                ((StreamStateHandle) Mockito.verify(streamStateHandle, Mockito.never())).discardState();
                                i2++;
                            }
                            Iterator it3 = incrementalRemoteKeyedStateHandle2.getPrivateState().values().iterator();
                            while (it3.hasNext()) {
                                ((StreamStateHandle) Mockito.verify((StreamStateHandle) it3.next(), Mockito.never())).discardState();
                            }
                            ((StreamStateHandle) Mockito.verify(incrementalRemoteKeyedStateHandle2.getMetaStateHandle(), Mockito.never())).discardState();
                        }
                        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState, Mockito.never())).discardState();
                    }
                }
                i4++;
            }
            Assert.assertEquals(10L, i2);
            embeddedCompletedCheckpointStore.removeOldestCheckpoint();
            Iterator it4 = arrayList.iterator();
            while (it4.hasNext()) {
                Iterator it5 = ((Map) it4.next()).values().iterator();
                while (it5.hasNext()) {
                    ((StreamStateHandle) Mockito.verify((StreamStateHandle) it5.next(), Mockito.never())).discardState();
                }
            }
            embeddedCompletedCheckpointStore.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
            HashSet hashSet = new HashSet();
            hashSet.add(jobVertex);
            Assert.assertEquals(JobStatus.SUSPENDED, embeddedCompletedCheckpointStore.getShutdownStatus().orElse(null));
            SharedStateRegistry create2 = SharedStateRegistry.DEFAULT_FACTORY.create(org.apache.flink.util.concurrent.Executors.directExecutor(), embeddedCompletedCheckpointStore.getAllCheckpoints(), restoreMode);
            EmbeddedCompletedCheckpointStore embeddedCompletedCheckpointStore2 = new EmbeddedCompletedCheckpointStore(10, embeddedCompletedCheckpointStore.getAllCheckpoints(), create2);
            Assert.assertTrue(timer.setCompletedCheckpointStore(embeddedCompletedCheckpointStore2).build(build).restoreLatestCheckpointedStateToAll(hashSet, false));
            int i5 = 0;
            for (CompletedCheckpoint completedCheckpoint2 : successfulCheckpoints) {
                Iterator it6 = completedCheckpoint2.getOperatorStates().values().iterator();
                while (it6.hasNext()) {
                    Iterator it7 = ((OperatorState) it6.next()).getStates().iterator();
                    while (it7.hasNext()) {
                        Iterator it8 = ((OperatorSubtaskState) it7.next()).getManagedKeyedState().iterator();
                        while (it8.hasNext()) {
                            ((KeyedStateHandle) Mockito.verify((KeyedStateHandle) it8.next(), i5 > 0 ? Mockito.times(1) : Mockito.never())).registerSharedStates(create2, completedCheckpoint2.getCheckpointID());
                        }
                    }
                }
                i5++;
            }
            embeddedCompletedCheckpointStore2.removeOldestCheckpoint();
            verifyDiscard(arrayList, num -> {
                return (restoreMode == RestoreMode.CLAIM && num.intValue() == 0) ? Mockito.times(1) : Mockito.never();
            });
            embeddedCompletedCheckpointStore2.removeOldestCheckpoint();
            verifyDiscard(arrayList, num2 -> {
                return num2.intValue() == 1 ? Mockito.never() : Mockito.atLeast(0);
            });
        }
    }

    @Test
    public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
        final Tuple2 of = Tuple2.of(0, (Object) null);
        IOException iOException = new IOException("Custom-Exception");
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(build, new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.3
            public void failJob(Throwable th) {
                of.f0 = Integer.valueOf(((Integer) of.f0).intValue() + 1);
                of.f1 = th;
            }

            public void failJobDueToTaskFailure(Throwable th, ExecutionAttemptID executionAttemptID) {
                throw new AssertionError("This method should not be called for the test.");
            }
        }));
        CompletableFuture triggerSynchronousSavepoint = checkpointCoordinator.triggerSynchronousSavepoint(false, "test-dir", SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(declineSynchronousSavepoint(build.getJobID(), checkpointCoordinator, attemptId, iOException).isDisposed());
        String format = String.format("%s: %s", iOException.getClass().getName(), iOException.getMessage());
        try {
            triggerSynchronousSavepoint.get();
            Assert.fail("Expected Exception not found.");
        } catch (ExecutionException e) {
            Throwable stripExecutionException = ExceptionUtils.stripExecutionException(e);
            Assert.assertTrue(stripExecutionException instanceof CheckpointException);
            Assert.assertEquals(format, stripExecutionException.getCause().getCause().getMessage());
        }
        Assert.assertEquals(1L, ((Integer) of.f0).intValue());
        Assert.assertTrue((of.f1 instanceof CheckpointException) && ((Throwable) of.f1).getCause().getCause().getMessage().equals(format));
        checkpointCoordinator.shutdown();
    }

    @Test
    public void testTriggerCheckpointAfterStopping() throws Exception {
        CheckpointIDCounter stoppingCheckpointIDCounter = new StoppingCheckpointIDCounter();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(stoppingCheckpointIDCounter).setTimer(this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        stoppingCheckpointIDCounter.setOwner(build);
        testTriggerCheckpoint(build, CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
    }

    @Test
    public void testTriggerCheckpointWithCounterIOException() throws Exception {
        CheckpointIDCounter iOExceptionCheckpointIDCounter = new IOExceptionCheckpointIDCounter();
        TestFailJobCallback testFailJobCallback = new TestFailJobCallback();
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter(iOExceptionCheckpointIDCounter).setFailureManager(new CheckpointFailureManager(0, testFailJobCallback)).setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(checkpointStatsTracker).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        iOExceptionCheckpointIDCounter.setOwner(build);
        testTriggerCheckpoint(build, CheckpointFailureReason.IO_EXCEPTION);
        Assert.assertEquals(1L, testFailJobCallback.getInvokeCounter());
        CheckpointStatsCounts counts = checkpointStatsTracker.createSnapshot().getCounts();
        Assert.assertEquals(0L, counts.getNumberOfRestoredCheckpoints());
        Assert.assertEquals(1L, counts.getTotalNumberOfCheckpoints());
        Assert.assertEquals(0L, counts.getNumberOfInProgressCheckpoints());
        Assert.assertEquals(0L, counts.getNumberOfCompletedCheckpoints());
        Assert.assertEquals(1L, counts.getNumberOfFailedCheckpoints());
        Assert.assertNull(checkpointStatsTracker.getPendingCheckpointStats(1L));
    }

    private void testTriggerCheckpoint(CheckpointCoordinator checkpointCoordinator, CheckpointFailureReason checkpointFailureReason) throws Exception {
        try {
            checkpointCoordinator.startCheckpointScheduler();
            CompletableFuture triggerCheckpoint = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, true);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            try {
                triggerCheckpoint.get();
                Assert.fail("should not trigger periodic checkpoint");
            } catch (ExecutionException e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
                if (!findThrowable.isPresent() || ((CheckpointException) findThrowable.get()).getCheckpointFailureReason() != checkpointFailureReason) {
                    throw e;
                }
            }
        } finally {
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    public void testSavepointScheduledInUnalignedMode() throws Exception {
        int i = 0;
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setUnalignedCheckpointsEnabled(true).setMaxConcurrentCheckpoints(1).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        try {
            ArrayList arrayList = new ArrayList(10);
            build2.startCheckpointScheduler();
            while (i < 10) {
                arrayList.add(build2.triggerCheckpoint(true));
                i++;
            }
            Assert.assertEquals(i - 1, build2.getNumQueuedRequests());
            CompletableFuture triggerSavepoint = build2.triggerSavepoint("/tmp", SavepointFormatType.CANONICAL);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            int i2 = i + 1;
            Assert.assertEquals(i2 - 1, build2.getNumQueuedRequests());
            build2.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), 1L, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "none");
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((i2 - 1) - 1, build2.getNumQueuedRequests());
            Assert.assertEquals(1L, arrayList.stream().filter((v0) -> {
                return v0.isDone();
            }).count());
            Assert.assertFalse(triggerSavepoint.isDone());
            Assert.assertEquals(1, build2.getNumberOfPendingCheckpoints());
            CheckpointProperties props = ((PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next()).getProps();
            Assert.assertTrue(props.isSavepoint());
            Assert.assertFalse(props.forceCheckpoint());
            build2.shutdown();
        } catch (Throwable th) {
            build2.shutdown();
            throw th;
        }
    }

    @Test
    public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway checkpointRecorderTaskManagerGateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        final ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).setTaskManagerGateway(checkpointRecorderTaskManagerGateway).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        final ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        final ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        OperatorID generatedOperatorID = ((OperatorIDPair) executionVertex.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID generatedOperatorID2 = ((OperatorIDPair) executionVertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        final TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        final TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
        OperatorSubtaskState build2 = OperatorSubtaskState.builder().build();
        OperatorSubtaskState build3 = OperatorSubtaskState.builder().build();
        taskStateSnapshot.putSubtaskStateByOperatorID(generatedOperatorID, build2);
        taskStateSnapshot.putSubtaskStateByOperatorID(generatedOperatorID2, build3);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CheckpointCoordinator build4 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((l, completableFuture) -> {
            atomicBoolean.set(true);
            completableFuture.complete(new byte[0]);
        }).setOperatorID(generatedOperatorID).build())).build(build);
        final AtomicReference atomicReference = new AtomicReference();
        build4.addMasterHook(new MasterTriggerRestoreHook<Integer>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.4
            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long j, long j2, Executor executor) throws Exception {
                Assert.assertTrue("The coordinator checkpoint should have finished.", atomicBoolean.get());
                atomicReference.set(Long.valueOf(j));
                AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(build.getJobID(), attemptId, j, new CheckpointMetrics(), taskStateSnapshot);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(build.getJobID(), attemptId2, j, new CheckpointMetrics(), taskStateSnapshot2);
                build4.receiveAcknowledgeMessage(acknowledgeCheckpoint, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                build4.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long j, Integer num) throws Exception {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.4.1
                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer num) throws IOException {
                        return new byte[0];
                    }

                    /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                    public Integer m24deserialize(int i, byte[] bArr) throws IOException {
                        return 1;
                    }
                };
            }
        });
        Assert.assertEquals(0L, build4.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, build4.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        CompletableFuture triggerCheckpoint = build4.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally(triggerCheckpoint);
        Assert.assertEquals(1L, build4.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals(0L, build4.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
        long longValue = ((Long) atomicReference.get()).longValue();
        Iterator it = Arrays.asList(executionVertex, executionVertex2).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(longValue, checkpointRecorderTaskManagerGateway.getOnlyTriggeredCheckpoint(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId()).checkpointId);
        }
        Assert.assertEquals(build.getJobID(), ((CompletedCheckpoint) build4.getSuccessfulCheckpoints().get(0)).getJobId());
        Assert.assertEquals(2L, r0.getOperatorStates().size());
        build4.shutdown();
    }

    @Test
    public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        final ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).addJobVertex(jobVertexID2).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(jobVertexID2).getTaskVertices()[0];
        final ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        final ExecutionAttemptID attemptId2 = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
        OperatorID generatedOperatorID = ((OperatorIDPair) executionVertex.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID generatedOperatorID2 = ((OperatorIDPair) executionVertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        final TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        final TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
        OperatorSubtaskState build2 = OperatorSubtaskState.builder().build();
        OperatorSubtaskState build3 = OperatorSubtaskState.builder().build();
        taskStateSnapshot.putSubtaskStateByOperatorID(generatedOperatorID, build2);
        taskStateSnapshot2.putSubtaskStateByOperatorID(generatedOperatorID2, build3);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CheckpointCoordinator build4 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((l, completableFuture) -> {
            atomicBoolean.set(true);
            completableFuture.complete(new byte[0]);
        }).setOperatorID(generatedOperatorID).build())).setCheckpointStorage(new JobManagerCheckpointStorage() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.5
            private static final long serialVersionUID = 8134582566514272546L;

            public CheckpointStorageAccess createCheckpointStorage(JobID jobID) throws IOException {
                return new MemoryBackendCheckpointStorageAccess(jobID, null, null, 100) { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.5.1
                    public CheckpointStorageLocation initializeLocationForCheckpoint(long j) throws IOException {
                        return new NonPersistentMetadataCheckpointStorageLocation(1000) { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.5.1.1
                            public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
                                throw new IOException("Artificial Exception");
                            }
                        };
                    }
                };
            }
        }).build(build);
        final AtomicReference atomicReference = new AtomicReference();
        build4.addMasterHook(new MasterTriggerRestoreHook<Integer>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.6
            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long j, long j2, Executor executor) throws Exception {
                Assert.assertTrue("The coordinator checkpoint should have finished.", atomicBoolean.get());
                atomicReference.set(Long.valueOf(j));
                AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(build.getJobID(), attemptId, j, new CheckpointMetrics(), taskStateSnapshot);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(build.getJobID(), attemptId2, j, new CheckpointMetrics(), taskStateSnapshot2);
                build4.receiveAcknowledgeMessage(acknowledgeCheckpoint, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                build4.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long j, Integer num) throws Exception {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.6.1
                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer num) throws IOException {
                        return new byte[0];
                    }

                    /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                    public Integer m25deserialize(int i, byte[] bArr) throws IOException {
                        return 1;
                    }
                };
            }
        });
        CompletableFuture triggerCheckpoint = build4.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue(triggerCheckpoint.isCompletedExceptionally());
        Assert.assertTrue(build4.getSuccessfulCheckpoints().isEmpty());
    }

    @Test
    public void testResetCalledInRegionRecovery() throws Exception {
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        TestResetHook testResetHook = new TestResetHook("id");
        build.addMasterHook(testResetHook);
        Assert.assertFalse(testResetHook.resetCalled);
        build.restoreLatestCheckpointedStateToSubtasks(Collections.emptySet());
        Assert.assertTrue(testResetHook.resetCalled);
    }

    @Test
    public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext build2 = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((l, completableFuture) -> {
            completableFuture.complete(new byte[0]);
        }).build();
        CheckpointCoordinator build3 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(build2)).build(build);
        try {
            build3.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            ((Long) Collections.max(build3.getPendingCheckpoints().keySet())).longValue();
            build3.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            build3.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, ((Long) Collections.max(build3.getPendingCheckpoints().keySet())).longValue(), new CheckpointMetrics(), (TaskStateSnapshot) null), "");
            Assert.assertEquals(Collections.singletonList(1L), build2.getAbortedCheckpoints());
            Assert.assertEquals(Collections.singletonList(2L), build2.getCompletedCheckpoints());
            build3.shutdown();
        } catch (Throwable th) {
            build3.shutdown();
            throw th;
        }
    }

    @Test
    public void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext build2 = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((l, completableFuture) -> {
        }).build();
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator build3 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointTimeout(10L).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(build2)).build(build);
        try {
            build3.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue(build3.isTriggering());
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse(build3.isTriggering());
            build3.shutdown();
            newSingleThreadScheduledExecutor.shutdownNow();
        } catch (Throwable th) {
            build3.shutdown();
            newSingleThreadScheduledExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        String str = "Trigger";
        String str2 = "Abort";
        ArrayList arrayList = new ArrayList();
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointTimeout(10L).build()).setIoExecutor(this.manuallyTriggeredScheduledExecutor).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((l, completableFuture) -> {
            arrayList.add(str + l);
            completableFuture.complete(new byte[0]);
        }).setOnCallingAbortCurrentTriggering(() -> {
            arrayList.add(str2);
        }).build())).build(build);
        try {
            build2.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.trigger();
            this.manuallyTriggeredScheduledExecutor.trigger();
            declineCheckpoint(1L, build2, jobVertexID, build);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Preconditions.checkState(!build2.isTriggering());
            build2.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue(!arrayList.contains(new StringBuilder().append("Trigger").append("1").toString()) || arrayList.indexOf(new StringBuilder().append("Trigger").append("1").toString()) < arrayList.indexOf("Abort"));
            build2.shutdown();
        } catch (Throwable th) {
            build2.shutdown();
            throw th;
        }
    }

    @Test
    public void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTransitToRunning(false).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        final AtomicLong atomicLong = new AtomicLong(-1L);
        ExecutionGraphTestUtils.setVertexResource(executionVertex, new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.7
            @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
            public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobID, long j, long j2, long j3) {
                atomicLong.set(j2);
            }
        }).createTestingLogicalSlot());
        executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).build(build);
        CompletableFuture triggerCheckpoint = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long longValue = ((Long) ((Map.Entry) build2.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), new TaskStateSnapshot()), "localhost");
        Assert.assertTrue(triggerCheckpoint.isDone());
        Assert.assertFalse(triggerCheckpoint.isCompletedExceptionally());
        CompletableFuture triggerCheckpoint2 = build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        build2.receiveDeclineMessage(new DeclineCheckpoint(build.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), ((Long) ((Map.Entry) build2.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue(), new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED)), "localhost");
        Assert.assertTrue(triggerCheckpoint2.isCompletedExceptionally());
        Assert.assertEquals(longValue, atomicLong.get());
    }

    @Test
    public void testBaseLocationsNotInitialized() throws Exception {
        File newFolder = this.tmpFolder.newFolder();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).setTransitToRunning(false).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(Long.MAX_VALUE).build()).setCheckpointStorage(new FsStateBackend(newFolder.toURI())).build(build);
        Assert.assertFalse(FileSystem.get(newFolder.toURI()).exists(new Path(newFolder.getAbsolutePath(), build.getJobID().toString())));
    }

    private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph executionGraph) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build(executionGraph);
    }

    private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph executionGraph, CheckpointFailureManager checkpointFailureManager) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setFailureManager(checkpointFailureManager).build(executionGraph);
    }

    private CheckpointCoordinator getCheckpointCoordinator(ScheduledExecutor scheduledExecutor) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(scheduledExecutor).build(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
    }

    private CheckpointFailureManager getCheckpointFailureManager(final String str) {
        return new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.8
            public void failJob(Throwable th) {
                throw new RuntimeException(str);
            }

            public void failJobDueToTaskFailure(Throwable th, ExecutionAttemptID executionAttemptID) {
                throw new RuntimeException(str);
            }
        });
    }

    private PendingCheckpoint declineSynchronousSavepoint(JobID jobID, CheckpointCoordinator checkpointCoordinator, ExecutionAttemptID executionAttemptID, Throwable th) {
        long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionAttemptID, longValue, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED, th)), TASK_MANAGER_LOCATION_INFO);
        return pendingCheckpoint;
    }

    private void performIncrementalCheckpoint(JobID jobID, CheckpointCoordinator checkpointCoordinator, ExecutionJobVertex executionJobVertex, List<KeyGroupRange> list, int i) throws Exception {
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(1L, checkpointCoordinator.getPendingCheckpoints().size());
        long longValue = ((Long) Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet())).longValue();
        for (int i2 = 0; i2 < executionJobVertex.getParallelism(); i2++) {
            KeyGroupRange keyGroupRange = list.get(i2);
            HashMap hashMap = new HashMap();
            hashMap.put(new StateHandleID("private-1"), Mockito.spy(new ByteStreamStateHandle("private-1", new byte[]{112})));
            HashMap hashMap2 = new HashMap();
            if (i > 0) {
                hashMap2.put(new StateHandleID("shared-" + (i - 1)), Mockito.spy(new PlaceholderStreamStateHandle(1L)));
            }
            hashMap2.put(new StateHandleID("shared-" + i), Mockito.spy(new ByteStreamStateHandle("shared-" + i + "-" + keyGroupRange, new byte[]{115})));
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.spy(OperatorSubtaskState.builder().setManagedKeyedState((IncrementalRemoteKeyedStateHandle) Mockito.spy(new IncrementalRemoteKeyedStateHandle(new UUID(42L, 42L), keyGroupRange, longValue, hashMap2, hashMap, (StreamStateHandle) Mockito.spy(new ByteStreamStateHandle("meta", new byte[]{109}))))).build());
            HashMap hashMap3 = new HashMap();
            hashMap3.put(((OperatorIDPair) executionJobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID(), operatorSubtaskState);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionJobVertex.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), new TaskStateSnapshot(hashMap3)), TASK_MANAGER_LOCATION_INFO);
        }
    }

    private static void verifyDiscard(List<Map<StateHandleID, StreamStateHandle>> list, Function<Integer, VerificationMode> function) throws Exception {
        Iterator<Map<StateHandleID, StreamStateHandle>> it = list.iterator();
        while (it.hasNext()) {
            for (Map.Entry<StateHandleID, StreamStateHandle> entry : it.next().entrySet()) {
                String keyString = entry.getKey().getKeyString();
                ((StreamStateHandle) Mockito.verify(entry.getValue(), function.apply(Integer.valueOf(Integer.parseInt(String.valueOf(keyString.charAt(keyString.length() - 1))))))).discardState();
            }
        }
    }

    private TestingStreamStateHandle handle() {
        return new TestingStreamStateHandle();
    }

    private void declineCheckpoint(long j, CheckpointCoordinator checkpointCoordinator, JobVertexID jobVertexID, ExecutionGraph executionGraph) {
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(executionGraph.getJobID(), executionGraph.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), j, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "test");
    }

    private void ackCheckpoint(long j, CheckpointCoordinator checkpointCoordinator, JobVertexID jobVertexID, ExecutionGraph executionGraph, TestingStreamStateHandle testingStreamStateHandle, TestingStreamStateHandle testingStreamStateHandle2, TestingStreamStateHandle testingStreamStateHandle3) throws CheckpointException {
        HashMap hashMap = new HashMap(Collections.singletonMap(new StateHandleID("shared-state-key"), testingStreamStateHandle3));
        HashMap hashMap2 = new HashMap(Collections.singletonMap(new StateHandleID("private-state-key"), testingStreamStateHandle2));
        ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(executionGraph.getJobID(), jobVertex.getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), j, new CheckpointMetrics(), new TaskStateSnapshot(Collections.singletonMap(((OperatorIDPair) jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID(), OperatorSubtaskState.builder().setManagedKeyedState(new IncrementalRemoteKeyedStateHandle(UUID.randomUUID(), KeyGroupRange.of(0, 9), j, hashMap, hashMap2, testingStreamStateHandle)).build()))), "test");
    }
}
