package org.apache.flink.runtime.checkpoint;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.class */
class CheckpointCoordinatorRestoringTest extends TestLogger {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;

    @TempDir
    private Path tmpFolder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest$TestScaleType.class */
    public enum TestScaleType {
        INCREASE_PARALLELISM,
        DECREASE_PARALLELISM,
        SAME_PARALLELISM
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest$TestingVertex.class */
    public static class TestingVertex {
        private final JobVertexID id;
        private final int parallelism;
        private final int maxParallelism;

        private TestingVertex(JobVertexID jobVertexID, int i, int i2) {
            this.id = jobVertexID;
            this.parallelism = i;
            this.maxParallelism = i2;
        }

        public JobVertexID getId() {
            return this.id;
        }

        public int getParallelism() {
            return this.parallelism;
        }

        public int getMaxParallelism() {
            return this.maxParallelism;
        }
    }

    CheckpointCoordinatorRestoringTest() {
    }

    private static void acknowledgeCheckpoint(CheckpointCoordinator checkpointCoordinator, ExecutionGraph executionGraph, ExecutionJobVertex executionJobVertex, long j) throws Exception {
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(executionJobVertex.getMaxParallelism(), executionJobVertex.getParallelism());
        for (int i = 0; i < createKeyGroupPartitions.size(); i++) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(executionGraph.getJobID(), executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), j, new CheckpointMetrics(), CheckpointCoordinatorTestingUtils.mockSubtaskState(executionJobVertex.getJobVertexId(), i, (KeyGroupRange) createKeyGroupPartitions.get(i))), TASK_MANAGER_LOCATION_INFO);
        }
    }

    private static ExecutionGraph createExecutionGraph(List<TestingVertex> list) throws Exception {
        CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder checkpointExecutionGraphBuilder = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder();
        for (TestingVertex testingVertex : list) {
            checkpointExecutionGraphBuilder.addJobVertex(testingVertex.getId(), testingVertex.getParallelism(), testingVertex.getMaxParallelism());
        }
        return checkpointExecutionGraphBuilder.build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    @BeforeEach
    void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    void testRestoreLatestCheckpointedState() throws Exception {
        List<TestingVertex> asList = Arrays.asList(new TestingVertex(new JobVertexID(), 3, 42), new TestingVertex(new JobVertexID(), 2, 13));
        testRestoreLatestCheckpointedState(asList, testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(asList));
    }

    private Collection<CompletedCheckpoint> testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(List<TestingVertex> list) throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(list);
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setCompletedCheckpointStore(embeddedCompletedCheckpointStore).build(createExecutionGraph);
        build.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(build.getPendingCheckpoints().size()).isOne();
        long longValue = ((Long) Iterables.getOnlyElement(build.getPendingCheckpoints().keySet())).longValue();
        Iterator<TestingVertex> it = list.iterator();
        while (it.hasNext()) {
            acknowledgeCheckpoint(build, createExecutionGraph, (ExecutionJobVertex) Objects.requireNonNull(createExecutionGraph.getJobVertex(it.next().getId())), longValue);
        }
        Assertions.assertThat(build.getSuccessfulCheckpoints().size()).isOne();
        embeddedCompletedCheckpointStore.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
        return embeddedCompletedCheckpointStore.getAllCheckpoints();
    }

    private void testRestoreLatestCheckpointedState(List<TestingVertex> list, Collection<CompletedCheckpoint> collection) throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(list);
        CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).setCompletedCheckpointStore(new EmbeddedCompletedCheckpointStore(collection.size(), collection, RestoreMode.DEFAULT)).build(createExecutionGraph);
        Stream<R> map = list.stream().map((v0) -> {
            return v0.getId();
        });
        createExecutionGraph.getClass();
        Set set = (Set) map.map(createExecutionGraph::getJobVertex).collect(Collectors.toSet());
        Assertions.assertThat(build.restoreLatestCheckpointedStateToAll(set, false)).isTrue();
        for (CompletedCheckpoint completedCheckpoint : collection) {
            Iterator it = completedCheckpoint.getOperatorStates().values().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((OperatorState) it.next()).getStates().iterator();
                while (it2.hasNext()) {
                    ((OperatorSubtaskState) Mockito.verify((OperatorSubtaskState) it2.next(), Mockito.times(2))).registerSharedStates((SharedStateRegistry) ArgumentMatchers.any(SharedStateRegistry.class), ArgumentMatchers.eq(completedCheckpoint.getCheckpointID()));
                }
            }
        }
        Iterator it3 = set.iterator();
        while (it3.hasNext()) {
            CheckpointCoordinatorTestingUtils.verifyStateRestore((ExecutionJobVertex) it3.next());
        }
    }

    @Test
    void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
        testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
        testRestoreLatestCheckpointedStateWithChangingParallelism(true);
    }

    private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean z) throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int i = z ? 2 : 13;
        int i2 = z ? 13 : 2;
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).addJobVertex(jobVertexID2, i, 13).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex2 = build.getJobVertex(jobVertexID2);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(build2.getPendingCheckpoints().size()).isOne();
        long longValue = ((Long) Iterables.getOnlyElement(build2.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, i);
        for (int i3 = 0; i3 < jobVertex.getParallelism(); i3++) {
            OperatorSubtaskState build3 = OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, i3, 2, 8, false)).setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i3), false)).setRawKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i3), true)).setInputChannelState(StateObjectCollection.singleton(StateHandleDummyUtil.createNewInputChannelStateHandle(3, new Random()))).build();
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), build3);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex.getTaskVertices()[i3].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        }
        ArrayList arrayList = new ArrayList(jobVertex2.getParallelism());
        ArrayList arrayList2 = new ArrayList(jobVertex2.getParallelism());
        for (int i4 = 0; i4 < jobVertex2.getParallelism(); i4++) {
            KeyGroupsStateHandle generateKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i4), false);
            KeyGroupsStateHandle generateKeyGroupState2 = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i4), true);
            OperatorStateHandle generatePartitionableStateHandle = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, i4, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle2 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID2, i4, 2, 8, true);
            arrayList.add(new ChainedStateHandle(Collections.singletonList(generatePartitionableStateHandle)));
            arrayList2.add(new ChainedStateHandle(Collections.singletonList(generatePartitionableStateHandle2)));
            OperatorSubtaskState build4 = OperatorSubtaskState.builder().setManagedOperatorState(generatePartitionableStateHandle).setRawOperatorState(generatePartitionableStateHandle2).setManagedKeyedState(generateKeyGroupState).setRawKeyedState(generateKeyGroupState2).build();
            TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
            taskStateSnapshot2.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), build4);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex2.getTaskVertices()[i4].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
        }
        Assertions.assertThat(build2.getSuccessfulCheckpoints().size()).isOne();
        List createKeyGroupPartitions3 = StateAssignmentOperation.createKeyGroupPartitions(13, i2);
        ExecutionGraph build5 = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).addJobVertex(jobVertexID2, i2, 13).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex3 = build5.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex4 = build5.getJobVertex(jobVertexID2);
        CheckpointCoordinator build6 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build(build5);
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex3);
        hashSet.add(jobVertex4);
        Assertions.assertThat(build6.restoreLatestCheckpointedStateToAll(hashSet, false)).isTrue();
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID, jobVertex3, createKeyGroupPartitions);
        ArrayList arrayList3 = new ArrayList(jobVertex4.getParallelism());
        ArrayList arrayList4 = new ArrayList(jobVertex4.getParallelism());
        for (int i5 = 0; i5 < jobVertex4.getParallelism(); i5++) {
            List operatorIDs = jobVertex4.getOperatorIDs();
            KeyGroupsStateHandle generateKeyGroupState3 = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions3.get(i5), false);
            KeyGroupsStateHandle generateKeyGroupState4 = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions3.get(i5), true);
            JobManagerTaskRestore taskRestore = jobVertex4.getTaskVertices()[i5].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat(taskRestore.getRestoreCheckpointId()).isOne();
            TaskStateSnapshot taskStateSnapshot3 = taskRestore.getTaskStateSnapshot();
            int size = operatorIDs.size() - 1;
            ArrayList arrayList5 = new ArrayList(operatorIDs.size());
            ArrayList arrayList6 = new ArrayList(operatorIDs.size());
            for (int i6 = 0; i6 < operatorIDs.size(); i6++) {
                OperatorSubtaskState subtaskStateByOperatorID = taskStateSnapshot3.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(i6)).getGeneratedOperatorID());
                StateObjectCollection managedOperatorState = subtaskStateByOperatorID.getManagedOperatorState();
                StateObjectCollection rawOperatorState = subtaskStateByOperatorID.getRawOperatorState();
                arrayList5.add(managedOperatorState);
                arrayList6.add(rawOperatorState);
                if (i6 == size) {
                    StateObjectCollection managedKeyedState = subtaskStateByOperatorID.getManagedKeyedState();
                    StateObjectCollection rawKeyedState = subtaskStateByOperatorID.getRawKeyedState();
                    CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState3), managedKeyedState);
                    CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState4), rawKeyedState);
                }
            }
            arrayList3.add(arrayList5);
            arrayList4.add(arrayList6);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState(arrayList, arrayList3);
        CheckpointCoordinatorTestingUtils.comparePartitionableState(arrayList2, arrayList4);
    }

    @Test
    void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).addJobVertex(jobVertexID2, 2, 13).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex2 = build.getJobVertex(jobVertexID2);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(build2.getPendingCheckpoints().size()).isOne();
        long longValue = ((Long) Iterables.getOnlyElement(build2.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(13, 2);
        for (int i = 0; i < jobVertex.getParallelism(); i++) {
            OperatorSubtaskState build3 = OperatorSubtaskState.builder().setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i), false)).build();
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), build3);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        }
        for (int i2 = 0; i2 < jobVertex2.getParallelism(); i2++) {
            OperatorSubtaskState build4 = OperatorSubtaskState.builder().setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID2, (KeyGroupRange) createKeyGroupPartitions2.get(i2), false)).build();
            TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
            taskStateSnapshot2.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), build4);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex2.getTaskVertices()[i2].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot2), TASK_MANAGER_LOCATION_INFO);
        }
        Assertions.assertThat(build2.getSuccessfulCheckpoints().size()).isOne();
        ExecutionGraph build5 = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 20).addJobVertex(jobVertexID2, 2, 42).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex3 = build5.getJobVertex(jobVertexID);
        ExecutionJobVertex jobVertex4 = build5.getJobVertex(jobVertexID2);
        CheckpointCoordinator build6 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setTimer(this.manuallyTriggeredScheduledExecutor).build(build5);
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex3);
        hashSet.add(jobVertex4);
        Assertions.assertThatThrownBy(() -> {
            build6.restoreLatestCheckpointedStateToAll(hashSet, false);
        }).as("The restoration should have failed because the max parallelism changed.", new Object[0]).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testStateRecoveryWhenTopologyChangeOut() throws Exception {
        testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
    }

    @Test
    void testStateRecoveryWhenTopologyChangeIn() throws Exception {
        testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
    }

    @Test
    void testStateRecoveryWhenTopologyChange() throws Exception {
        testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
    }

    private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
        JobVertexID jobVertexID = new JobVertexID();
        return new Tuple2<>(jobVertexID, OperatorID.fromJobVertexID(jobVertexID));
    }

    public void testStateRecoveryWithTopologyChange(TestScaleType testScaleType) throws Exception {
        Tuple2<JobVertexID, OperatorID> generateIDPair = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair2 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair3 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair4 = generateIDPair();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(64, 10);
        HashMap hashMap = new HashMap();
        for (Tuple2 tuple2 : Arrays.asList(generateIDPair, generateIDPair2)) {
            OperatorState operatorState = new OperatorState((OperatorID) tuple2.f1, 10, 64);
            hashMap.put(tuple2.f1, operatorState);
            for (int i = 0; i < operatorState.getParallelism(); i++) {
                operatorState.putState(i, OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) tuple2.f0, i, 2, 8, false)).setRawOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) tuple2.f0, i, 2, 8, true)).build());
            }
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Tuple2 tuple22 : Arrays.asList(generateIDPair3, generateIDPair4)) {
            OperatorState operatorState2 = new OperatorState((OperatorID) tuple22.f1, 10, 64);
            hashMap.put(tuple22.f1, operatorState2);
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            arrayList.add(arrayList3);
            arrayList2.add(arrayList4);
            for (int i2 = 0; i2 < operatorState2.getParallelism(); i2++) {
                OperatorSubtaskState.Builder builder = OperatorSubtaskState.builder();
                OperatorStateHandle operatorStateHandle = CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID) tuple22.f0, i2, 2, 8, false).get(0);
                OperatorStateHandle operatorStateHandle2 = CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle((JobVertexID) tuple22.f0, i2, 2, 8, true).get(0);
                if (((JobVertexID) tuple22.f0).equals(generateIDPair3.f0)) {
                    builder.setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) tuple22.f0, (KeyGroupRange) createKeyGroupPartitions.get(i2), false));
                }
                if (((JobVertexID) tuple22.f0).equals(generateIDPair3.f0)) {
                    builder.setRawKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) tuple22.f0, (KeyGroupRange) createKeyGroupPartitions.get(i2), true));
                }
                arrayList3.add(ChainedStateHandle.wrapSingleHandle(operatorStateHandle));
                arrayList4.add(ChainedStateHandle.wrapSingleHandle(operatorStateHandle2));
                operatorState2.putState(i2, builder.setManagedOperatorState(operatorStateHandle).setRawOperatorState(operatorStateHandle2).build());
            }
        }
        Tuple2<JobVertexID, OperatorID> generateIDPair5 = generateIDPair();
        Tuple2<JobVertexID, OperatorID> generateIDPair6 = generateIDPair();
        int i3 = 10;
        if (testScaleType == TestScaleType.INCREASE_PARALLELISM) {
            i3 = 20;
        } else if (testScaleType == TestScaleType.DECREASE_PARALLELISM) {
            i3 = 8;
        }
        List createKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(64, i3);
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex((JobVertexID) generateIDPair5.f0, 10, 64, (List) Stream.of((Object[]) new OperatorID[]{(OperatorID) generateIDPair2.f1, (OperatorID) generateIDPair.f1, (OperatorID) generateIDPair5.f1}).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).addJobVertex((JobVertexID) generateIDPair3.f0, i3, 64, (List) Stream.of((Object[]) new OperatorID[]{(OperatorID) generateIDPair6.f1, (OperatorID) generateIDPair3.f1}).map(OperatorIDPair::generatedIDOnly).collect(Collectors.toList()), true).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = build.getJobVertex((JobVertexID) generateIDPair5.f0);
        ExecutionJobVertex jobVertex2 = build.getJobVertex((JobVertexID) generateIDPair3.f0);
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex);
        hashSet.add(jobVertex2);
        new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(storeFor(SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RestoreMode.DEFAULT), () -> {
        }, new CompletedCheckpoint(build.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000, hashMap, Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation(), (CompletedCheckpointStats) null))).setTimer(this.manuallyTriggeredScheduledExecutor).build(build).restoreLatestCheckpointedStateToAll(hashSet, true);
        for (int i4 = 0; i4 < jobVertex.getParallelism(); i4++) {
            List operatorIDs = jobVertex.getOperatorIDs();
            JobManagerTaskRestore taskRestore = jobVertex.getTaskVertices()[i4].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat(taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
            TaskStateSnapshot taskStateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState subtaskStateByOperatorID = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID());
            Assertions.assertThat(subtaskStateByOperatorID.getManagedKeyedState()).isEmpty();
            Assertions.assertThat(subtaskStateByOperatorID.getRawKeyedState()).isEmpty();
            OperatorSubtaskState subtaskStateByOperatorID2 = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(2)).getGeneratedOperatorID());
            Assertions.assertThat(subtaskStateByOperatorID2.getManagedOperatorState()).isEmpty();
            Assertions.assertThat(subtaskStateByOperatorID2.getRawOperatorState()).isEmpty();
            OperatorSubtaskState subtaskStateByOperatorID3 = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(1)).getGeneratedOperatorID());
            OperatorStateHandle generatePartitionableStateHandle = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair.f0, i4, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle2 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair.f0, i4, 2, 8, true);
            StateObjectCollection managedOperatorState = subtaskStateByOperatorID3.getManagedOperatorState();
            Assertions.assertThat(managedOperatorState.size()).isOne();
            Assertions.assertThat(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle.openInputStream(), ((OperatorStateHandle) managedOperatorState.iterator().next()).openInputStream())).isTrue();
            StateObjectCollection rawOperatorState = subtaskStateByOperatorID3.getRawOperatorState();
            Assertions.assertThat(rawOperatorState.size()).isOne();
            Assertions.assertThat(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle2.openInputStream(), ((OperatorStateHandle) rawOperatorState.iterator().next()).openInputStream())).isTrue();
            OperatorSubtaskState subtaskStateByOperatorID4 = taskStateSnapshot.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs.get(0)).getGeneratedOperatorID());
            OperatorStateHandle generatePartitionableStateHandle3 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair2.f0, i4, 2, 8, false);
            OperatorStateHandle generatePartitionableStateHandle4 = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle((JobVertexID) generateIDPair2.f0, i4, 2, 8, true);
            StateObjectCollection managedOperatorState2 = subtaskStateByOperatorID4.getManagedOperatorState();
            Assertions.assertThat(managedOperatorState2.size()).isOne();
            Assertions.assertThat(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle3.openInputStream(), ((OperatorStateHandle) managedOperatorState2.iterator().next()).openInputStream())).isTrue();
            StateObjectCollection rawOperatorState2 = subtaskStateByOperatorID4.getRawOperatorState();
            Assertions.assertThat(rawOperatorState2.size()).isOne();
            Assertions.assertThat(CommonTestUtils.isStreamContentEqual(generatePartitionableStateHandle4.openInputStream(), ((OperatorStateHandle) rawOperatorState2.iterator().next()).openInputStream())).isTrue();
        }
        ArrayList arrayList5 = new ArrayList(jobVertex2.getParallelism());
        ArrayList arrayList6 = new ArrayList(jobVertex2.getParallelism());
        for (int i5 = 0; i5 < jobVertex2.getParallelism(); i5++) {
            List operatorIDs2 = jobVertex2.getOperatorIDs();
            JobManagerTaskRestore taskRestore2 = jobVertex2.getTaskVertices()[i5].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat(taskRestore2.getRestoreCheckpointId()).isEqualTo(2L);
            TaskStateSnapshot taskStateSnapshot2 = taskRestore2.getTaskStateSnapshot();
            OperatorSubtaskState subtaskStateByOperatorID5 = taskStateSnapshot2.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs2.get(1)).getGeneratedOperatorID());
            ArrayList arrayList7 = new ArrayList(1);
            arrayList7.add(subtaskStateByOperatorID5.getManagedOperatorState());
            ArrayList arrayList8 = new ArrayList(1);
            arrayList8.add(subtaskStateByOperatorID5.getRawOperatorState());
            arrayList5.add(arrayList7);
            arrayList6.add(arrayList8);
            OperatorSubtaskState subtaskStateByOperatorID6 = taskStateSnapshot2.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs2.get(0)).getGeneratedOperatorID());
            Assertions.assertThat(subtaskStateByOperatorID6.getManagedOperatorState()).isEmpty();
            Assertions.assertThat(subtaskStateByOperatorID6.getRawOperatorState()).isEmpty();
            KeyGroupsStateHandle generateKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) generateIDPair3.f0, (KeyGroupRange) createKeyGroupPartitions2.get(i5), false);
            KeyGroupsStateHandle generateKeyGroupState2 = CheckpointCoordinatorTestingUtils.generateKeyGroupState((JobVertexID) generateIDPair3.f0, (KeyGroupRange) createKeyGroupPartitions2.get(i5), true);
            OperatorSubtaskState subtaskStateByOperatorID7 = taskStateSnapshot2.getSubtaskStateByOperatorID(((OperatorIDPair) operatorIDs2.get(operatorIDs2.size() - 1)).getGeneratedOperatorID());
            StateObjectCollection managedKeyedState = subtaskStateByOperatorID7.getManagedKeyedState();
            StateObjectCollection rawKeyedState = subtaskStateByOperatorID7.getRawKeyedState();
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState), managedKeyedState);
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(generateKeyGroupState2), rawKeyedState);
        }
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List) arrayList.get(0), arrayList5);
        CheckpointCoordinatorTestingUtils.comparePartitionableState((List) arrayList2.get(0), arrayList6);
    }

    static CompletedCheckpointStore storeFor(SharedStateRegistry sharedStateRegistry, Runnable runnable, CompletedCheckpoint... completedCheckpointArr) throws Exception {
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(completedCheckpointArr.length);
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        for (CompletedCheckpoint completedCheckpoint : completedCheckpointArr) {
            standaloneCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, checkpointsCleaner, runnable);
        }
        return standaloneCompletedCheckpointStore;
    }

    @Test
    void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 42).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointIdOfIgnoredInFlightData(1L).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        build2.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(build2.getPendingCheckpoints().size()).isOne();
        long longValue = ((Long) Iterables.getOnlyElement(build2.getPendingCheckpoints().keySet())).longValue();
        List createKeyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions(42, 3);
        Random random = new Random();
        for (int i = 0; i < jobVertex.getParallelism(); i++) {
            OperatorSubtaskState build3 = OperatorSubtaskState.builder().setManagedOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, i, 2, 8, false)).setRawOperatorState(CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, i, 2, 8, true)).setManagedKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i), false)).setRawKeyedState(CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, (KeyGroupRange) createKeyGroupPartitions.get(i), true)).setInputChannelState(StateObjectCollection.singleton(StateHandleDummyUtil.createNewInputChannelStateHandle(3, random))).setResultSubpartitionState(StateObjectCollection.singleton(StateHandleDummyUtil.createNewResultSubpartitionStateHandle(3, random))).build();
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), build3);
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot), TASK_MANAGER_LOCATION_INFO);
        }
        Assertions.assertThat(build2.getSuccessfulCheckpoints().size()).isOne();
        HashSet hashSet = new HashSet();
        hashSet.add(jobVertex);
        Assertions.assertThat(build2.restoreLatestCheckpointedStateToAll(hashSet, false)).isTrue();
        CheckpointCoordinatorTestingUtils.verifyStateRestore(jobVertexID, jobVertex, createKeyGroupPartitions);
        for (int i2 = 0; i2 < jobVertex.getParallelism(); i2++) {
            JobManagerTaskRestore taskRestore = jobVertex.getTaskVertices()[i2].getCurrentExecutionAttempt().getTaskRestore();
            Assertions.assertThat(taskRestore.getRestoreCheckpointId()).isOne();
            OperatorSubtaskState subtaskStateByOperatorID = taskRestore.getTaskStateSnapshot().getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID));
            Assertions.assertThat(subtaskStateByOperatorID.getInputChannelState()).isEmpty();
            Assertions.assertThat(subtaskStateByOperatorID.getResultSubpartitionState()).isEmpty();
            Assertions.assertThat(subtaskStateByOperatorID.getRawOperatorState()).isNotEmpty();
            Assertions.assertThat(subtaskStateByOperatorID.getManagedOperatorState()).isNotEmpty();
            Assertions.assertThat(subtaskStateByOperatorID.getRawKeyedState()).isNotEmpty();
            Assertions.assertThat(subtaskStateByOperatorID.getManagedOperatorState()).isNotEmpty();
        }
    }

    @Test
    void testRestoreFinishedStateWithoutInFlightData() throws Exception {
        OperatorIDPair generatedIDOnly = OperatorIDPair.generatedIDOnly(new OperatorID());
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 1, 1, Collections.singletonList(generatedIDOnly), true).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        HashMap hashMap = new HashMap();
        hashMap.put(generatedIDOnly.getGeneratedOperatorID(), new FullyFinishedOperatorState(generatedIDOnly.getGeneratedOperatorID(), 1, 1));
        embeddedCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(new CompletedCheckpoint(build.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000, hashMap, Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation(), (CompletedCheckpointStats) null), new CheckpointsCleaner(), () -> {
        });
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointIdOfIgnoredInFlightData(2L).build()).setCompletedCheckpointStore(embeddedCompletedCheckpointStore).build(build);
        ExecutionJobVertex jobVertex = build.getJobVertex(jobVertexID);
        build2.restoreInitialCheckpointIfPresent(Collections.singleton(jobVertex));
        Assertions.assertThat(jobVertex.getTaskVertices()[0].getCurrentExecutionAttempt().getTaskRestore().getTaskStateSnapshot().isTaskDeployedAsFinished()).isTrue();
    }

    @Test
    void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID(), 1, 1).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CompletedCheckpointStore embeddedCompletedCheckpointStore = new EmbeddedCompletedCheckpointStore();
        embeddedCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(new CompletedCheckpoint(build.getJobID(), 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation(), (CompletedCheckpointStats) null), new CheckpointsCleaner(), () -> {
        });
        BooleanValue booleanValue = new BooleanValue(false);
        new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore(embeddedCompletedCheckpointStore).setVertexFinishedStateCheckerFactory((set, map) -> {
            return new VertexFinishedStateChecker(set, map) { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorRestoringTest.1
                public void validateOperatorsFinishedState() {
                    booleanValue.set(true);
                }
            };
        }).build(build).restoreInitialCheckpointIfPresent(new HashSet(build.getAllVertices().values()));
        ((AbstractBooleanAssert) Assertions.assertThat(booleanValue.get()).as("The finished states should be checked when job is restored on startup", new Object[0])).isTrue();
    }

    @Test
    void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 1, 1).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build(build);
        CompletableFuture triggerSavepoint = build2.triggerSavepoint("file://" + TempDirUtils.newFolder(this.tmpFolder).getAbsolutePath(), SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), ((Long) build2.getPendingCheckpoints().keySet().stream().findFirst().get()).longValue()), "localhost");
        Assertions.assertThat(triggerSavepoint).isDone();
        BooleanValue booleanValue = new BooleanValue(false);
        new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setVertexFinishedStateCheckerFactory((set, map) -> {
            return new VertexFinishedStateChecker(set, map) { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorRestoringTest.2
                public void validateOperatorsFinishedState() {
                    booleanValue.set(true);
                }
            };
        }).build(build).restoreSavepoint(SavepointRestoreSettings.forPath(((CompletedCheckpoint) triggerSavepoint.get()).getExternalPointer()), build.getAllVertices(), getClass().getClassLoader());
        ((AbstractBooleanAssert) Assertions.assertThat(booleanValue.get()).as("The finished states should be checked when job is restored on startup", new Object[0])).isTrue();
    }
}
