package org.apache.flink.runtime.checkpoint;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.class */
class DefaultCheckpointPlanTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    @TempDir
    private Path temporaryFolder;

    DefaultCheckpointPlanTest() {
    }

    @Test
    void testAbortionIfPartiallyFinishedVertexUsedUnionListState() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = new OperatorID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)), true).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().markFinished();
        CheckpointPlan createCheckpointPlan = createCheckpointPlan(build);
        HashMap hashMap = new HashMap();
        OperatorState operatorState = new OperatorState(operatorID, 2, 2);
        operatorState.putState(0, CheckpointCoordinatorTestingUtils.createSubtaskStateWithUnionListState(TempDirUtils.newFile(this.temporaryFolder)));
        hashMap.put(operatorID, operatorState);
        Assertions.assertThatThrownBy(() -> {
            createCheckpointPlan.fulfillFinishedTaskStatus(hashMap);
        }).hasMessage(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks are FINISHED.", build.getJobVertex(jobVertexID).getName(), jobVertexID)).isInstanceOf(FlinkRuntimeException.class);
    }

    @Test
    void testAbortionIfPartiallyOperatorsFinishedVertexUsedUnionListState() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = new OperatorID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)), true).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        ExecutionVertex[] taskVertices = build.getJobVertex(jobVertexID).getTaskVertices();
        CheckpointPlan createCheckpointPlan = createCheckpointPlan(build);
        HashMap hashMap = new HashMap();
        OperatorState operatorState = new OperatorState(operatorID, 2, 2);
        operatorState.putState(0, CheckpointCoordinatorTestingUtils.createSubtaskStateWithUnionListState(TempDirUtils.newFile(this.temporaryFolder)));
        operatorState.putState(1, CheckpointCoordinatorTestingUtils.createSubtaskStateWithUnionListState(TempDirUtils.newFile(this.temporaryFolder)));
        createCheckpointPlan.reportTaskHasFinishedOperators(taskVertices[1]);
        hashMap.put(operatorID, operatorState);
        Assertions.assertThatThrownBy(() -> {
            createCheckpointPlan.fulfillFinishedTaskStatus(hashMap);
        }).hasMessage(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks has called operators' finish method.", build.getJobVertex(jobVertexID).getName(), jobVertexID)).isInstanceOf(FlinkRuntimeException.class);
    }

    @Test
    void testFulfillFinishedStates() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        OperatorID operatorID = new OperatorID();
        OperatorID operatorID2 = new OperatorID();
        OperatorID operatorID3 = new OperatorID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)), true).addJobVertex(jobVertexID2, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID2)), true).addJobVertex(jobVertexID3, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID3)), true).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        ExecutionVertex[] taskVertices = build.getJobVertex(jobVertexID).getTaskVertices();
        ExecutionVertex[] taskVertices2 = build.getJobVertex(jobVertexID2).getTaskVertices();
        ExecutionVertex[] taskVertices3 = build.getJobVertex(jobVertexID3).getTaskVertices();
        Arrays.stream(taskVertices).forEach(executionVertex -> {
            executionVertex.getCurrentExecutionAttempt().markFinished();
        });
        taskVertices3[0].getCurrentExecutionAttempt().markFinished();
        CheckpointPlan createCheckpointPlan = createCheckpointPlan(build);
        Stream stream = Arrays.stream(taskVertices2);
        createCheckpointPlan.getClass();
        stream.forEach(createCheckpointPlan::reportTaskFinishedOnRestore);
        HashMap hashMap = new HashMap();
        createCheckpointPlan.fulfillFinishedTaskStatus(hashMap);
        Assertions.assertThat(hashMap).hasSize(3);
        Assertions.assertThat(((OperatorState) hashMap.get(operatorID)).isFullyFinished()).isTrue();
        Assertions.assertThat(((OperatorState) hashMap.get(operatorID2)).isFullyFinished()).isTrue();
        OperatorState operatorState = (OperatorState) hashMap.get(operatorID3);
        Assertions.assertThat(operatorState.isFullyFinished()).isFalse();
        Assertions.assertThat(operatorState.getState(0).isFinished()).isTrue();
    }

    @Test
    void testFulfillFullyFinishedStatesWithCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = new OperatorID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 1, 256, Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)), true).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().markFinished();
        CheckpointPlan createCheckpointPlan = createCheckpointPlan(build);
        HashMap hashMap = new HashMap();
        OperatorState operatorState = new OperatorState(operatorID, 1, 256);
        operatorState.setCoordinatorState(new TestingStreamStateHandle());
        hashMap.put(operatorID, operatorState);
        createCheckpointPlan.fulfillFinishedTaskStatus(hashMap);
        Assertions.assertThat(hashMap).hasSize(1);
        Assertions.assertThat(((OperatorState) hashMap.get(operatorID)).isFullyFinished()).isTrue();
    }

    private CheckpointPlan createCheckpointPlan(ExecutionGraph executionGraph) throws Exception {
        return (CheckpointPlan) new DefaultCheckpointPlanCalculator(new JobID(), new ExecutionGraphCheckpointPlanCalculatorContext(executionGraph), executionGraph.getVerticesTopologically(), true).calculateCheckpointPlan().get();
    }
}
