/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.SerializableObject;
import org.apache.flink.util.TestLogger;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;
import org.mockito.verification.VerificationMode;

public class CheckpointCoordinatorRestoringTest
extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testRestoreLatestCheckpointedState() throws Exception {
        AcknowledgeCheckpoint acknowledgeCheckpoint;
        TaskStateSnapshot subtaskState;
        int index;
        JobID jid = new JobID();
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = 2;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex jobVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID2, parallelism2, maxParallelism2);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1 + parallelism2);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        RecoverableCompletedCheckpointStore store = new RecoverableCompletedCheckpointStore();
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jid).setTasks(arrayExecutionVertices).setCompletedCheckpointStore(store).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)coord.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (index = 0; index < jobVertex1.getParallelism(); ++index) {
            subtaskState = CheckpointCoordinatorTestingUtils.mockSubtaskState(jobVertexID1, index, (KeyGroupRange)keyGroupPartitions1.get(index));
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskState);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        for (index = 0; index < jobVertex2.getParallelism(); ++index) {
            subtaskState = CheckpointCoordinatorTestingUtils.mockSubtaskState(jobVertexID2, index, (KeyGroupRange)keyGroupPartitions2.get(index));
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskState);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        store.shutdown(JobStatus.SUSPENDED);
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(jobVertex1);
        tasks.add(jobVertex2);
        Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false));
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                for (OperatorSubtaskState subtaskState2 : taskState.getStates()) {
                    ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class));
                }
            }
        }
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID1, jobVertex1, keyGroupPartitions1);
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID2, jobVertex2, keyGroupPartitions2);
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(true);
    }

    @Test
    public void testRestoreLatestCheckpointWhenPreferCheckpoint() throws Exception {
        this.testRestoreLatestCheckpointIsPreferSavepoint(true);
    }

    @Test
    public void testRestoreLatestCheckpointWhenPreferSavepoint() throws Exception {
        this.testRestoreLatestCheckpointIsPreferSavepoint(false);
    }

    private void testRestoreLatestCheckpointIsPreferSavepoint(final boolean isPreferCheckpoint) {
        try {
            JobID jid = new JobID();
            StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
            JobVertexID statefulId = new JobVertexID();
            JobVertexID statelessId = new JobVertexID();
            Execution statefulExec1 = CheckpointCoordinatorTestingUtils.mockExecution();
            Execution statelessExec1 = CheckpointCoordinatorTestingUtils.mockExecution();
            ExecutionVertex stateful1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(statefulExec1, statefulId, 0, 1);
            ExecutionVertex stateless1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(statelessExec1, statelessId, 0, 1);
            ExecutionJobVertex stateful = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(statefulId, new ExecutionVertex[]{stateful1});
            ExecutionJobVertex stateless = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(statelessId, new ExecutionVertex[]{stateless1});
            HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
            tasks.add(stateful);
            tasks.add(stateless);
            RecoverableCompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(2);
            CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setPreferCheckpointForRecovery(isPreferCheckpoint).build();
            CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jid).setCheckpointCoordinatorConfiguration(chkConfig).setCheckpointIDCounter((CheckpointIDCounter)checkpointIDCounter).setCompletedCheckpointStore(store).setTasks(new ExecutionVertex[]{stateful1, stateless1}).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture checkpointFuture = coord.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertFalse((boolean)checkpointFuture.isCompletedExceptionally());
            long checkpointId = checkpointIDCounter.getLast();
            KeyGroupRange keyGroupRange = KeyGroupRange.of((int)0, (int)0);
            List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
            KeyGroupsStateHandle serializedKeyGroupStates = CheckpointCoordinatorTestingUtils.generateKeyGroupState(keyGroupRange, testStates);
            final TaskStateSnapshot subtaskStatesForCheckpoint = new TaskStateSnapshot();
            subtaskStatesForCheckpoint.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)statefulId), new OperatorSubtaskState(StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.singleton((StateObject)serializedKeyGroupStates), StateObjectCollection.empty()));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForCheckpoint), TASK_MANAGER_LOCATION_INFO);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO);
            CompletedCheckpoint success = (CompletedCheckpoint)coord.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals((Object)jid, (Object)success.getJobId());
            String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
            CompletableFuture savepointFuture = coord.triggerSavepoint(savepointDir);
            KeyGroupRange keyGroupRangeForSavepoint = KeyGroupRange.of((int)1, (int)1);
            List<SerializableObject> testStatesForSavepoint = Collections.singletonList(new SerializableObject());
            KeyGroupsStateHandle serializedKeyGroupStatesForSavepoint = CheckpointCoordinatorTestingUtils.generateKeyGroupState(keyGroupRangeForSavepoint, testStatesForSavepoint);
            final TaskStateSnapshot subtaskStatesForSavepoint = new TaskStateSnapshot();
            subtaskStatesForSavepoint.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)statefulId), new OperatorSubtaskState(StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.singleton((StateObject)serializedKeyGroupStatesForSavepoint), StateObjectCollection.empty()));
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            checkpointId = checkpointIDCounter.getLast();
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForSavepoint), TASK_MANAGER_LOCATION_INFO);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO);
            Assert.assertNotNull(savepointFuture.get());
            Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false));
            BaseMatcher<JobManagerTaskRestore> matcher = new BaseMatcher<JobManagerTaskRestore>(){

                public boolean matches(Object o) {
                    if (o instanceof JobManagerTaskRestore) {
                        JobManagerTaskRestore taskRestore = (JobManagerTaskRestore)o;
                        if (isPreferCheckpoint) {
                            return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStatesForCheckpoint);
                        }
                        return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStatesForSavepoint);
                    }
                    return false;
                }

                public void describeTo(Description description) {
                    if (isPreferCheckpoint) {
                        description.appendValue((Object)subtaskStatesForCheckpoint);
                    } else {
                        description.appendValue((Object)subtaskStatesForSavepoint);
                    }
                }
            };
            ((Execution)Mockito.verify((Object)statefulExec1, (VerificationMode)Mockito.times((int)1))).setInitialState((JobManagerTaskRestore)MockitoHamcrest.argThat((Matcher)matcher));
            ((Execution)Mockito.verify((Object)statelessExec1, (VerificationMode)Mockito.times((int)0))).setInitialState((JobManagerTaskRestore)Mockito.any());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean scaleOut) throws Exception {
        JobID jid = new JobID();
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = scaleOut ? 2 : 13;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        int newParallelism2 = scaleOut ? 13 : 2;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex jobVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID2, parallelism2, maxParallelism2);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1 + parallelism2);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jid).setTasks(arrayExecutionVertices).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)coord.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID1, index, 2, 8, false);
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), true);
            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, null, (KeyedStateHandle)keyedStateBackend, (KeyedStateHandle)keyedStateRaw, null, null);
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesBackend = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesRaw = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        for (int index = 0; index < jobVertex2.getParallelism(); ++index) {
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), true);
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, false);
            OperatorStateHandle opStateRaw = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, true);
            expectedOpStatesBackend.add(new ChainedStateHandle(Collections.singletonList(opStateBackend)));
            expectedOpStatesRaw.add(new ChainedStateHandle(Collections.singletonList(opStateRaw)));
            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, opStateRaw, (KeyedStateHandle)keyedStateBackend, (KeyedStateHandle)keyedStateRaw, null, null);
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionJobVertex newJobVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex newJobVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID2, newParallelism2, maxParallelism2);
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false));
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesBackend = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIDs = newJobVertex2.getOperatorIDs();
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)1L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot taskStateHandles = taskRestore.getTaskStateSnapshot();
            int headOpIndex = operatorIDs.size() - 1;
            ArrayList<StateObjectCollection> allParallelManagedOpStates = new ArrayList<StateObjectCollection>(operatorIDs.size());
            ArrayList<StateObjectCollection> allParallelRawOpStates = new ArrayList<StateObjectCollection>(operatorIDs.size());
            for (int idx = 0; idx < operatorIDs.size(); ++idx) {
                OperatorID operatorID = ((OperatorIDPair)operatorIDs.get(idx)).getGeneratedOperatorID();
                OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID);
                StateObjectCollection opStateBackend = opState.getManagedOperatorState();
                StateObjectCollection opStateRaw = opState.getRawOperatorState();
                allParallelManagedOpStates.add(opStateBackend);
                allParallelRawOpStates.add(opStateRaw);
                if (idx != headOpIndex) continue;
                StateObjectCollection keyedStateBackend = opState.getManagedKeyedState();
                StateObjectCollection keyGroupStateRaw = opState.getRawKeyedState();
                CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), (Collection<? extends KeyedStateHandle>)keyedStateBackend);
                CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), (Collection<? extends KeyedStateHandle>)keyGroupStateRaw);
            }
            actualOpStatesBackend.add(allParallelManagedOpStates);
            actualOpStatesRaw.add(allParallelRawOpStates);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState(expectedOpStatesBackend, actualOpStatesBackend);
        CheckpointCoordinatorTestingUtils.comparePartitionableState(expectedOpStatesRaw, actualOpStatesRaw);
    }

    @Test(expected=IllegalStateException.class)
    public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
        AcknowledgeCheckpoint acknowledgeCheckpoint;
        TaskStateSnapshot taskOperatorSubtaskStates;
        OperatorSubtaskState operatorSubtaskState;
        KeyGroupsStateHandle keyGroupState;
        int index;
        JobID jid = new JobID();
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = 2;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex jobVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID2, parallelism2, maxParallelism2);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1 + parallelism2);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jid).setTasks(arrayExecutionVertices).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)coord.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (index = 0; index < jobVertex1.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            operatorSubtaskState = new OperatorSubtaskState(null, null, (KeyedStateHandle)keyGroupState, null, null, null);
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        for (index = 0; index < jobVertex2.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            operatorSubtaskState = new OperatorSubtaskState(null, null, (KeyedStateHandle)keyGroupState, null, null, null);
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        int newMaxParallelism1 = 20;
        int newMaxParallelism2 = 42;
        ExecutionJobVertex newJobVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID1, parallelism1, newMaxParallelism1);
        ExecutionJobVertex newJobVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID2, parallelism2, newMaxParallelism2);
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false));
        Assert.fail((String)"The restoration should have failed because the max parallelism changed.");
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
    }

    @Test
    public void testStateRecoveryWhenTopologyChange() throws Exception {
        this.testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
    }

    private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = OperatorID.fromJobVertexID((JobVertexID)jobVertexID);
        return new Tuple2((Object)jobVertexID, (Object)operatorID);
    }

    public void testStateRecoveryWithTopologyChange(TestScaleType scaleType) throws Exception {
        Tuple2<JobVertexID, OperatorID> id1 = CheckpointCoordinatorRestoringTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id2 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int parallelism1 = 10;
        int maxParallelism1 = 64;
        Tuple2<JobVertexID, OperatorID> id3 = CheckpointCoordinatorRestoringTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id4 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int parallelism2 = 10;
        int maxParallelism2 = 64;
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        HashMap<Object, OperatorState> operatorStates = new HashMap<Object, OperatorState>();
        for (Tuple2 id : Arrays.asList(id1, id2)) {
            OperatorState taskState = new OperatorState((OperatorID)id.f1, parallelism1, maxParallelism1);
            operatorStates.put(id.f1, taskState);
            for (int index = 0; index < taskState.getParallelism(); ++index) {
                OperatorStateHandle subManagedOperatorState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false);
                OperatorStateHandle subRawOperatorState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true);
                OperatorSubtaskState subtaskState = new OperatorSubtaskState(subManagedOperatorState, subRawOperatorState, null, null, null, null);
                taskState.putState(index, subtaskState);
            }
        }
        ArrayList expectedManagedOperatorStates = new ArrayList();
        ArrayList expectedRawOperatorStates = new ArrayList();
        for (Tuple2 id : Arrays.asList(id3, id4)) {
            OperatorState operatorState = new OperatorState((OperatorID)id.f1, parallelism2, maxParallelism2);
            operatorStates.put(id.f1, operatorState);
            ArrayList<ChainedStateHandle> expectedManagedOperatorState = new ArrayList<ChainedStateHandle>();
            ArrayList<ChainedStateHandle> expectedRawOperatorState = new ArrayList<ChainedStateHandle>();
            expectedManagedOperatorStates.add(expectedManagedOperatorState);
            expectedRawOperatorStates.add(expectedRawOperatorState);
            for (int index = 0; index < operatorState.getParallelism(); ++index) {
                OperatorStateHandle subManagedOperatorState = (OperatorStateHandle)CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false).get(0);
                OperatorStateHandle subRawOperatorState = (OperatorStateHandle)CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true).get(0);
                KeyGroupsStateHandle subManagedKeyedState = ((JobVertexID)id.f0).equals(id3.f0) ? CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), false) : null;
                KeyGroupsStateHandle subRawKeyedState = ((JobVertexID)id.f0).equals(id3.f0) ? CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), true) : null;
                expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subManagedOperatorState));
                expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subRawOperatorState));
                OperatorSubtaskState subtaskState = new OperatorSubtaskState(subManagedOperatorState, subRawOperatorState, (KeyedStateHandle)subManagedKeyedState, (KeyedStateHandle)subRawKeyedState, null, null);
                operatorState.putState(index, subtaskState);
            }
        }
        Tuple2<JobVertexID, OperatorID> id5 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int newParallelism1 = 10;
        Tuple2<JobVertexID, OperatorID> id6 = CheckpointCoordinatorRestoringTest.generateIDPair();
        int newParallelism2 = parallelism2;
        if (scaleType == TestScaleType.INCREASE_PARALLELISM) {
            newParallelism2 = 20;
        } else if (scaleType == TestScaleType.DECREASE_PARALLELISM) {
            newParallelism2 = 8;
        }
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionJobVertex newJobVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex((JobVertexID)id5.f0, Arrays.asList((OperatorID)id2.f1, (OperatorID)id1.f1, (OperatorID)id5.f1), newParallelism1, maxParallelism1);
        ExecutionJobVertex newJobVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex((JobVertexID)id3.f0, Arrays.asList((OperatorID)id6.f1, (OperatorID)id3.f1), newParallelism2, maxParallelism2);
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(newJobVertex1);
        tasks.add(newJobVertex2);
        JobID jobID = new JobID();
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobID, 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation());
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTasks(newJobVertex1.getTaskVertices()).setCompletedCheckpointStore(CompletedCheckpointStore.storeFor((CompletedCheckpoint[])new CompletedCheckpoint[]{completedCheckpoint})).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        coord.restoreLatestCheckpointedStateToAll(tasks, true);
        for (int i = 0; i < newJobVertex1.getParallelism(); ++i) {
            List operatorIDs = newJobVertex1.getOperatorIDs();
            JobManagerTaskRestore taskRestore = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)2L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            Assert.assertTrue((boolean)headOpState.getManagedKeyedState().isEmpty());
            Assert.assertTrue((boolean)headOpState.getRawKeyedState().isEmpty());
            int operatorIndexInChain = 2;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            Assert.assertTrue((boolean)opState.getManagedOperatorState().isEmpty());
            Assert.assertTrue((boolean)opState.getRawOperatorState().isEmpty());
            operatorIndexInChain = 1;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            OperatorStateHandle expectedManagedOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, false);
            OperatorStateHandle expectedRawOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, true);
            StateObjectCollection managedOperatorState = opState.getManagedOperatorState();
            Assert.assertEquals((long)1L, (long)managedOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream()));
            StateObjectCollection rawOperatorState = opState.getRawOperatorState();
            Assert.assertEquals((long)1L, (long)rawOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream()));
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            expectedManagedOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, false);
            expectedRawOpState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, true);
            managedOperatorState = opState.getManagedOperatorState();
            Assert.assertEquals((long)1L, (long)managedOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream()));
            rawOperatorState = opState.getRawOperatorState();
            Assert.assertEquals((long)1L, (long)rawOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream()));
        }
        ArrayList<List<Collection<OperatorStateHandle>>> actualManagedOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualRawOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIDs = newJobVertex2.getOperatorIDs();
            JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)2L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            int operatorIndexInChain = 1;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            ArrayList<StateObjectCollection> actualSubManagedOperatorState = new ArrayList<StateObjectCollection>(1);
            actualSubManagedOperatorState.add(opState.getManagedOperatorState());
            ArrayList<StateObjectCollection> actualSubRawOperatorState = new ArrayList<StateObjectCollection>(1);
            actualSubRawOperatorState.add(opState.getRawOperatorState());
            actualManagedOperatorStates.add(actualSubManagedOperatorState);
            actualRawOperatorStates.add(actualSubRawOperatorState);
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIndexInChain)).getGeneratedOperatorID());
            Assert.assertTrue((boolean)opState.getManagedOperatorState().isEmpty());
            Assert.assertTrue((boolean)opState.getRawOperatorState().isEmpty());
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair)operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            StateObjectCollection keyedStateBackend = headOpState.getManagedKeyedState();
            StateObjectCollection keyGroupStateRaw = headOpState.getRawKeyedState();
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), (Collection<? extends KeyedStateHandle>)keyedStateBackend);
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), (Collection<? extends KeyedStateHandle>)keyGroupStateRaw);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List)expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List)expectedRawOperatorStates.get(0), actualRawOperatorStates);
    }

    private static enum TestScaleType {
        INCREASE_PARALLELISM,
        DECREASE_PARALLELISM,
        SAME_PARALLELISM;

    }
}

