/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.FullyFinishedOperatorState;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.VertexFinishedStateChecker;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class CheckpointCoordinatorRestoringTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @TempDir
    private Path tmpFolder;

    CheckpointCoordinatorRestoringTest() {
    }

    private static void acknowledgeCheckpoint(CheckpointCoordinator coordinator, ExecutionGraph executionGraph, ExecutionJobVertex jobVertex, long checkpointId) throws Exception {
        List partitions = StateAssignmentOperation.createKeyGroupPartitions((int)jobVertex.getMaxParallelism(), (int)jobVertex.getParallelism());
        for (int partitionIdx = 0; partitionIdx < partitions.size(); ++partitionIdx) {
            TaskStateSnapshot subtaskState = CheckpointCoordinatorTestingUtils.mockSubtaskState(jobVertex.getJobVertexId(), partitionIdx, (KeyGroupRange)partitions.get(partitionIdx));
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(executionGraph.getJobID(), jobVertex.getTaskVertices()[partitionIdx].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskState);
            coordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
    }

    private static ExecutionGraph createExecutionGraph(List<TestingVertex> vertices) throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder builder = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder();
        for (TestingVertex vertex : vertices) {
            builder.addJobVertex(vertex.getId(), vertex.getParallelism(), vertex.getMaxParallelism());
        }
        return builder.build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }

    @BeforeEach
    void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    void testRestoreLatestCheckpointedState() throws Exception {
        List<TestingVertex> vertices = Arrays.asList(new TestingVertex(new JobVertexID(), 3, 42), new TestingVertex(new JobVertexID(), 2, 13));
        this.testRestoreLatestCheckpointedState(vertices, this.testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(vertices));
    }

    private Collection<CompletedCheckpoint> testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(List<TestingVertex> vertices) throws Exception {
        ExecutionGraph executionGraph = CheckpointCoordinatorRestoringTest.createExecutionGraph(vertices);
        EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore();
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCompletedCheckpointStore((CompletedCheckpointStore)store).build(executionGraph);
        coordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)coordinator.getPendingCheckpoints().size()).isOne();
        long checkpointId = (Long)Iterables.getOnlyElement(coordinator.getPendingCheckpoints().keySet());
        for (TestingVertex vertex : vertices) {
            ExecutionJobVertex executionVertex = Objects.requireNonNull(executionGraph.getJobVertex(vertex.getId()));
            CheckpointCoordinatorRestoringTest.acknowledgeCheckpoint(coordinator, executionGraph, executionVertex, checkpointId);
        }
        List completedCheckpoints = coordinator.getSuccessfulCheckpoints();
        Assertions.assertThat((int)completedCheckpoints.size()).isOne();
        store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
        return store.getAllCheckpoints();
    }

    private void testRestoreLatestCheckpointedState(List<TestingVertex> vertices, Collection<CompletedCheckpoint> completedCheckpoints) throws Exception {
        ExecutionGraph executionGraph = CheckpointCoordinatorRestoringTest.createExecutionGraph(vertices);
        EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(completedCheckpoints.size(), completedCheckpoints, RecoveryClaimMode.DEFAULT);
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCompletedCheckpointStore((CompletedCheckpointStore)store).build(executionGraph);
        Set executionVertices = vertices.stream().map(TestingVertex::getId).map(arg_0 -> ((ExecutionGraph)executionGraph).getJobVertex(arg_0)).collect(Collectors.toSet());
        Assertions.assertThat((boolean)coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false)).isTrue();
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                for (OperatorSubtaskState subtaskState : taskState.getStates()) {
                    ((OperatorSubtaskState)Mockito.verify((Object)subtaskState, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class), ArgumentMatchers.eq((long)completedCheckpoint.getCheckpointID()));
                }
            }
        }
        for (ExecutionJobVertex executionVertex : executionVertices) {
            CheckpointCoordinatorTestingUtils.verifyStateRestore(executionVertex);
        }
    }

    @Test
    void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(true);
    }

    private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean scaleOut) throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = scaleOut ? 2 : 13;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        int newParallelism2 = scaleOut ? 13 : 2;
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).addJobVertex(jobVertexID2, parallelism2, maxParallelism2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)coord.getPendingCheckpoints().size()).isOne();
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID1, index, 2, 8, false);
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), true);
            OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState(opStateBackend).setManagedKeyedState((KeyedStateHandle)keyedStateBackend).setRawKeyedState((KeyedStateHandle)keyedStateRaw).setInputChannelState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewInputChannelStateHandle(3, new Random()))).build();
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesBackend = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesRaw = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        for (int index = 0; index < jobVertex2.getParallelism(); ++index) {
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), true);
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, false);
            OperatorStateHandle opStateRaw = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, true);
            expectedOpStatesBackend.add(new ChainedStateHandle(Collections.singletonList(opStateBackend)));
            expectedOpStatesRaw.add(new ChainedStateHandle(Collections.singletonList(opStateRaw)));
            OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState(opStateBackend).setRawOperatorState(opStateRaw).setManagedKeyedState((KeyedStateHandle)keyedStateBackend).setRawKeyedState((KeyedStateHandle)keyedStateRaw).build();
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assertions.assertThat((int)completedCheckpoints.size()).isOne();
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionGraph newGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).addJobVertex(jobVertexID2, newParallelism2, maxParallelism2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex newJobVertex1 = newGraph.getJobVertex(jobVertexID1);
        ExecutionJobVertex newJobVertex2 = newGraph.getJobVertex(jobVertexID2);
        CheckpointCoordinator newCoord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(newGraph);
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        Assertions.assertThat((boolean)newCoord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesBackend = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIDs = newJobVertex2.getOperatorIDs();
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat((long)taskRestore.getRestoreCheckpointId()).isOne();
            TaskStateSnapshot taskStateHandles = taskRestore.getTaskStateSnapshot();
            int headOpIndex = operatorIDs.size() - 1;
            ArrayList<StateObjectCollection> allParallelManagedOpStates = new ArrayList<StateObjectCollection>(operatorIDs.size());
            ArrayList<StateObjectCollection> allParallelRawOpStates = new ArrayList<StateObjectCollection>(operatorIDs.size());
            for (int idx = 0; idx < operatorIDs.size(); ++idx) {
                OperatorID operatorID = ((OperatorIDPair)operatorIDs.get(idx)).getGeneratedOperatorID();
                OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID);
                StateObjectCollection opStateBackend = opState.getManagedOperatorState();
                StateObjectCollection opStateRaw = opState.getRawOperatorState();
                allParallelManagedOpStates.add(opStateBackend);
                allParallelRawOpStates.add(opStateRaw);
                if (idx != headOpIndex) continue;
                StateObjectCollection keyedStateBackend = opState.getManagedKeyedState();
                StateObjectCollection keyGroupStateRaw = opState.getRawKeyedState();
                CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), (Collection<? extends KeyedStateHandle>)keyedStateBackend);
                CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), (Collection<? extends KeyedStateHandle>)keyGroupStateRaw);
            }
            actualOpStatesBackend.add(allParallelManagedOpStates);
            actualOpStatesRaw.add(allParallelRawOpStates);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState(expectedOpStatesBackend, actualOpStatesBackend);
        CheckpointCoordinatorTestingUtils.comparePartitionableState(expectedOpStatesRaw, actualOpStatesRaw);
    }

    @Test
    void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
        AcknowledgeCheckpoint acknowledgeCheckpoint;
        TaskStateSnapshot taskOperatorSubtaskStates;
        OperatorSubtaskState operatorSubtaskState;
        KeyGroupsStateHandle keyGroupState;
        int index;
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = 2;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).addJobVertex(jobVertexID2, parallelism2, maxParallelism2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)coord.getPendingCheckpoints().size()).isOne();
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (index = 0; index < jobVertex1.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            operatorSubtaskState = OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)keyGroupState).build();
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        for (index = 0; index < jobVertex2.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            operatorSubtaskState = OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)keyGroupState).build();
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assertions.assertThat((int)completedCheckpoints.size()).isOne();
        int newMaxParallelism1 = 20;
        int newMaxParallelism2 = 42;
        ExecutionGraph newGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, newMaxParallelism1).addJobVertex(jobVertexID2, parallelism2, newMaxParallelism2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex newJobVertex1 = newGraph.getJobVertex(jobVertexID1);
        ExecutionJobVertex newJobVertex2 = newGraph.getJobVertex(jobVertexID2);
        CheckpointCoordinator newCoord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(newGraph);
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> newCoord.restoreLatestCheckpointedStateToAll(tasks, false)).as("The restoration should have failed because the max parallelism changed.", new Object[0])).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testStateRecoveryWhenTopologyChangeOut() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
    }

    @Test
    void testStateRecoveryWhenTopologyChangeIn() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
    }

    @Test
    void testStateRecoveryWhenTopologyChange() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
    }

    private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = OperatorID.fromJobVertexID((JobVertexID)jobVertexID);
        return new Tuple2((Object)jobVertexID, (Object)operatorID);
    }

    public void testStateRecoveryWithTopologyChange(TestScaleType scaleType) throws Exception {
        Tuple2<JobVertexID, OperatorID> id1 = CheckpointCoordinatorRestoringTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id2 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int parallelism1 = 10;
        int maxParallelism1 = 64;
        Tuple2<JobVertexID, OperatorID> id3 = CheckpointCoordinatorRestoringTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id4 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int parallelism2 = 10;
        int maxParallelism2 = 64;
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        HashMap<OperatorID, OperatorState> operatorStates = new HashMap<OperatorID, OperatorState>();
        for (Tuple2 id : Arrays.asList(id1, id2)) {
            OperatorState taskState = new OperatorState(null, null, (OperatorID)id.f1, parallelism1, maxParallelism1);
            operatorStates.put((OperatorID)id.f1, taskState);
            for (int index = 0; index < taskState.getParallelism(); ++index) {
                OperatorSubtaskState subtaskState = OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false)).setRawOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true)).build();
                taskState.putState(index, subtaskState);
            }
        }
        ArrayList expectedManagedOperatorStates = new ArrayList();
        ArrayList expectedRawOperatorStates = new ArrayList();
        for (Tuple2 id : Arrays.asList(id3, id4)) {
            OperatorState operatorState = new OperatorState(null, null, (OperatorID)id.f1, parallelism2, maxParallelism2);
            operatorStates.put((OperatorID)id.f1, operatorState);
            ArrayList<ChainedStateHandle> expectedManagedOperatorState = new ArrayList<ChainedStateHandle>();
            ArrayList<ChainedStateHandle> expectedRawOperatorState = new ArrayList<ChainedStateHandle>();
            expectedManagedOperatorStates.add(expectedManagedOperatorState);
            expectedRawOperatorStates.add(expectedRawOperatorState);
            for (int index = 0; index < operatorState.getParallelism(); ++index) {
                OperatorSubtaskState.Builder stateBuilder = OperatorSubtaskState.builder();
                OperatorStateHandle subManagedOperatorState = (OperatorStateHandle)CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false).get(0);
                OperatorStateHandle subRawOperatorState = (OperatorStateHandle)CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true).get(0);
                if (((JobVertexID)id.f0).equals(id3.f0)) {
                    stateBuilder.setManagedKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), false));
                }
                if (((JobVertexID)id.f0).equals(id3.f0)) {
                    stateBuilder.setRawKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), true));
                }
                expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subManagedOperatorState));
                expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subRawOperatorState));
                OperatorSubtaskState subtaskState = stateBuilder.setManagedOperatorState(subManagedOperatorState).setRawOperatorState(subRawOperatorState).build();
                operatorState.putState(index, subtaskState);
            }
        }
        Tuple2<JobVertexID, OperatorID> id5 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int newParallelism1 = 10;
        Tuple2<JobVertexID, OperatorID> id6 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int newParallelism2 = parallelism2;
        if (scaleType == TestScaleType.INCREASE_PARALLELISM) {
            newParallelism2 = 20;
        } else if (scaleType == TestScaleType.DECREASE_PARALLELISM) {
            newParallelism2 = 8;
        }
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionGraph newGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex((JobVertexID)id5.f0, newParallelism1, maxParallelism1, Stream.of((OperatorID)id2.f1, (OperatorID)id1.f1, (OperatorID)id5.f1).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).addJobVertex((JobVertexID)id3.f0, newParallelism2, maxParallelism2, Stream.of((OperatorID)id6.f1, (OperatorID)id3.f1).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex newJobVertex1 = newGraph.getJobVertex((JobVertexID)id5.f0);
        ExecutionJobVertex newJobVertex2 = newGraph.getJobVertex((JobVertexID)id3.f0);
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(newGraph.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
        SharedStateRegistry sharedStateRegistry = SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RecoveryClaimMode.DEFAULT);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(CheckpointCoordinatorRestoringTest.storeFor(sharedStateRegistry, () -> {}, completedCheckpoint)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(newGraph);
        coord.restoreLatestCheckpointedStateToAll(tasks, true);
        for (int i = 0; i < newJobVertex1.getParallelism(); ++i) {
            List operatorIDs = newJobVertex1.getOperatorIDs();
            JobManagerTaskRestore taskRestore = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat((long)taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            Assertions.assertThat((Collection)headOpState.getManagedKeyedState()).isEmpty();
            Assertions.assertThat((Collection)headOpState.getRawKeyedState()).isEmpty();
            int operatorIndexInChain = 2;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            Assertions.assertThat((Collection)opState.getManagedOperatorState()).isEmpty();
            Assertions.assertThat((Collection)opState.getRawOperatorState()).isEmpty();
            operatorIndexInChain = 1;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            OperatorStateHandle expectedManagedOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, false);
            OperatorStateHandle expectedRawOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, true);
            StateObjectCollection managedOperatorState = opState.getManagedOperatorState();
            Assertions.assertThat((int)managedOperatorState.size()).isOne();
            Assertions.assertThat((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream())).isTrue();
            StateObjectCollection rawOperatorState = opState.getRawOperatorState();
            Assertions.assertThat((int)rawOperatorState.size()).isOne();
            Assertions.assertThat((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream())).isTrue();
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            expectedManagedOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, false);
            expectedRawOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, true);
            managedOperatorState = opState.getManagedOperatorState();
            Assertions.assertThat((int)managedOperatorState.size()).isOne();
            Assertions.assertThat((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream())).isTrue();
            rawOperatorState = opState.getRawOperatorState();
            Assertions.assertThat((int)rawOperatorState.size()).isOne();
            Assertions.assertThat((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream())).isTrue();
        }
        ArrayList<List<Collection<OperatorStateHandle>>> actualManagedOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualRawOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIDs = newJobVertex2.getOperatorIDs();
            JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat((long)taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            int operatorIndexInChain = 1;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            ArrayList<StateObjectCollection> actualSubManagedOperatorState = new ArrayList<StateObjectCollection>(1);
            actualSubManagedOperatorState.add(opState.getManagedOperatorState());
            ArrayList<StateObjectCollection> actualSubRawOperatorState = new ArrayList<StateObjectCollection>(1);
            actualSubRawOperatorState.add(opState.getRawOperatorState());
            actualManagedOperatorStates.add(actualSubManagedOperatorState);
            actualRawOperatorStates.add(actualSubRawOperatorState);
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            Assertions.assertThat((Collection)opState.getManagedOperatorState()).isEmpty();
            Assertions.assertThat((Collection)opState.getRawOperatorState()).isEmpty();
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            StateObjectCollection keyedStateBackend = headOpState.getManagedKeyedState();
            StateObjectCollection keyGroupStateRaw = headOpState.getRawKeyedState();
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), (Collection<? extends KeyedStateHandle>)keyedStateBackend);
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), (Collection<? extends KeyedStateHandle>)keyGroupStateRaw);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List)expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List)expectedRawOperatorStates.get(0), actualRawOperatorStates);
    }

    static CompletedCheckpointStore storeFor(SharedStateRegistry sharedStateRegistry, Runnable postCleanupAction, CompletedCheckpoint ... checkpoints) throws Exception {
        StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(checkpoints.length);
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        for (CompletedCheckpoint checkpoint : checkpoints) {
            store.addCheckpointAndSubsumeOldestOne(checkpoint, checkpointsCleaner, postCleanupAction);
        }
        return store;
    }

    @Test
    void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        int parallelism1 = 3;
        int maxParallelism1 = 42;
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, parallelism1, maxParallelism1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointIdOfIgnoredInFlightData(1L).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)coord.getPendingCheckpoints().size()).isOne();
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        Random random = new Random();
        for (int index = 0; index < jobVertex.getParallelism(); ++index) {
            OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false)).setRawOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, index, 2, 8, true)).setManagedKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange)keyGroupPartitions1.get(index), false)).setRawKeyedState((KeyedStateHandle)CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange)keyGroupPartitions1.get(index), true)).setInputChannelState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewInputChannelStateHandle(3, random))).setResultSubpartitionState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewResultSubpartitionStateHandle(3, random))).build();
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(graph.getJobID(), jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        Assertions.assertThat((int)coord.getSuccessfulCheckpoints().size()).isOne();
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(jobVertex);
        Assertions.assertThat((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID, jobVertex, keyGroupPartitions1);
        for (int i = 0; i < jobVertex.getParallelism(); ++i) {
            JobManagerTaskRestore taskRestore = jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat((long)taskRestore.getRestoreCheckpointId()).isOne();
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState operatorState = stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID));
            Assertions.assertThat((Collection)operatorState.getInputChannelState()).isEmpty();
            Assertions.assertThat((Collection)operatorState.getResultSubpartitionState()).isEmpty();
            Assertions.assertThat((Collection)operatorState.getRawOperatorState()).isNotEmpty();
            Assertions.assertThat((Collection)operatorState.getManagedOperatorState()).isNotEmpty();
            Assertions.assertThat((Collection)operatorState.getRawKeyedState()).isNotEmpty();
            Assertions.assertThat((Collection)operatorState.getManagedOperatorState()).isNotEmpty();
        }
    }

    @Test
    void testRestoreFinishedStateWithoutInFlightData() throws Exception {
        OperatorIDPair op1 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 1, 1, Collections.singletonList(op1), true).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        HashMap<OperatorID, FullyFinishedOperatorState> operatorStates = new HashMap<OperatorID, FullyFinishedOperatorState>();
        operatorStates.put(op1.getGeneratedOperatorID(), new FullyFinishedOperatorState(null, null, op1.getGeneratedOperatorID(), 1, 1));
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(graph.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
        completedCheckpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {});
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointIdOfIgnoredInFlightData(2L).build()).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).build(graph);
        ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID);
        coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex));
        TaskStateSnapshot restoredState = vertex.getTaskVertices()[0].getCurrentExecutionAttempt().getTaskRestore().getTaskStateSnapshot();
        Assertions.assertThat((boolean)restoredState.isTaskDeployedAsFinished()).isTrue();
    }

    @Test
    void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 1, 1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        EmbeddedCompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(graph.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
        completedCheckpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {});
        final BooleanValue checked = new BooleanValue(false);
        CheckpointCoordinator restoreCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setVertexFinishedStateCheckerFactory((vertices, states) -> new VertexFinishedStateChecker((Set)vertices, (Map)states){

            public void validateOperatorsFinishedState() {
                checked.set(true);
            }
        }).build(graph);
        restoreCoordinator.restoreInitialCheckpointIfPresent(new HashSet(graph.getAllVertices().values()));
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)checked.get()).as("The finished states should be checked when job is restored on startup", new Object[0])).isTrue();
    }

    @Test
    void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 1, 1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        File savepointPath = TempDirUtils.newFolder((Path)this.tmpFolder);
        CompletableFuture savepointFuture = coordinator.triggerSavepoint("file://" + savepointPath.getAbsolutePath(), SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long pendingSavepointId = (Long)coordinator.getPendingCheckpoints().keySet().stream().findFirst().get();
        coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), graph.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), pendingSavepointId), "localhost");
        Assertions.assertThat((CompletableFuture)savepointFuture).isDone();
        final BooleanValue checked = new BooleanValue(false);
        CheckpointCoordinator restoreCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setVertexFinishedStateCheckerFactory((vertices, states) -> new VertexFinishedStateChecker((Set)vertices, (Map)states){

            public void validateOperatorsFinishedState() {
                checked.set(true);
            }
        }).build(graph);
        restoreCoordinator.restoreSavepoint(SavepointRestoreSettings.forPath((String)((CompletedCheckpoint)savepointFuture.get()).getExternalPointer()), graph.getAllVertices(), this.getClass().getClassLoader());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)checked.get()).as("The finished states should be checked when job is restored on startup", new Object[0])).isTrue();
    }

    private static class TestingVertex {
        private final JobVertexID id;
        private final int parallelism;
        private final int maxParallelism;

        private TestingVertex(JobVertexID id, int parallelism, int maxParallelism) {
            this.id = id;
            this.parallelism = parallelism;
            this.maxParallelism = maxParallelism;
        }

        public JobVertexID getId() {
            return this.id;
        }

        public int getParallelism() {
            return this.parallelism;
        }

        public int getMaxParallelism() {
            return this.maxParallelism;
        }
    }

    private static enum TestScaleType {
        INCREASE_PARALLELISM,
        DECREASE_PARALLELISM,
        SAME_PARALLELISM;

    }
}

