/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointPlan;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculatorContext;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class DefaultCheckpointPlanTest {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final ExpectedException expectedException = ExpectedException.none();

    @Test
    public void testAbortionIfPartiallyFinishedVertexUsedUnionListState() throws Exception {
        JobVertexID jobVertexId = new JobVertexID();
        OperatorID operatorId = new OperatorID();
        ExecutionGraph executionGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexId, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)operatorId)), true).build();
        ExecutionVertex[] tasks = executionGraph.getJobVertex(jobVertexId).getTaskVertices();
        tasks[0].getCurrentExecutionAttempt().markFinished();
        CheckpointPlan checkpointPlan = this.createCheckpointPlan(executionGraph);
        HashMap<OperatorID, OperatorState> operatorStates = new HashMap<OperatorID, OperatorState>();
        OperatorState operatorState = new OperatorState(operatorId, 2, 2);
        operatorState.putState(0, CheckpointCoordinatorTestingUtils.createSubtaskStateWithUnionListState(TEMPORARY_FOLDER.newFile()));
        operatorStates.put(operatorId, operatorState);
        this.expectedException.expect(FlinkRuntimeException.class);
        this.expectedException.expectMessage(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks are FINISHED", executionGraph.getJobVertex(jobVertexId).getName(), jobVertexId));
        checkpointPlan.fulfillFinishedTaskStatus(operatorStates);
    }

    @Test
    public void testAbortionIfPartiallyOperatorsFinishedVertexUsedUnionListState() throws Exception {
        JobVertexID jobVertexId = new JobVertexID();
        OperatorID operatorId = new OperatorID();
        ExecutionGraph executionGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexId, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)operatorId)), true).build();
        ExecutionVertex[] tasks = executionGraph.getJobVertex(jobVertexId).getTaskVertices();
        CheckpointPlan checkpointPlan = this.createCheckpointPlan(executionGraph);
        HashMap<OperatorID, OperatorState> operatorStates = new HashMap<OperatorID, OperatorState>();
        OperatorState operatorState = new OperatorState(operatorId, 2, 2);
        operatorState.putState(0, CheckpointCoordinatorTestingUtils.createSubtaskStateWithUnionListState(TEMPORARY_FOLDER.newFile()));
        operatorState.putState(1, CheckpointCoordinatorTestingUtils.createSubtaskStateWithUnionListState(TEMPORARY_FOLDER.newFile()));
        checkpointPlan.reportTaskHasFinishedOperators(tasks[1]);
        operatorStates.put(operatorId, operatorState);
        this.expectedException.expect(FlinkRuntimeException.class);
        this.expectedException.expectMessage(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks has called operators' finish method.", executionGraph.getJobVertex(jobVertexId).getName(), jobVertexId));
        checkpointPlan.fulfillFinishedTaskStatus(operatorStates);
    }

    @Test
    public void testFulfillFinishedStates() throws Exception {
        JobVertexID fullyFinishedVertexId = new JobVertexID();
        JobVertexID finishedOnRestoreVertexId = new JobVertexID();
        JobVertexID partiallyFinishedVertexId = new JobVertexID();
        OperatorID fullyFinishedOperatorId = new OperatorID();
        OperatorID finishedOnRestoreOperatorId = new OperatorID();
        OperatorID partiallyFinishedOperatorId = new OperatorID();
        ExecutionGraph executionGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(fullyFinishedVertexId, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)fullyFinishedOperatorId)), true).addJobVertex(finishedOnRestoreVertexId, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)finishedOnRestoreOperatorId)), true).addJobVertex(partiallyFinishedVertexId, 2, 2, Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)partiallyFinishedOperatorId)), true).build();
        ExecutionVertex[] fullyFinishedVertexTasks = executionGraph.getJobVertex(fullyFinishedVertexId).getTaskVertices();
        ExecutionVertex[] finishedOnRestoreVertexTasks = executionGraph.getJobVertex(finishedOnRestoreVertexId).getTaskVertices();
        ExecutionVertex[] partiallyFinishedVertexTasks = executionGraph.getJobVertex(partiallyFinishedVertexId).getTaskVertices();
        Arrays.stream(fullyFinishedVertexTasks).forEach(task -> task.getCurrentExecutionAttempt().markFinished());
        partiallyFinishedVertexTasks[0].getCurrentExecutionAttempt().markFinished();
        CheckpointPlan checkpointPlan = this.createCheckpointPlan(executionGraph);
        Arrays.stream(finishedOnRestoreVertexTasks).forEach(arg_0 -> ((CheckpointPlan)checkpointPlan).reportTaskFinishedOnRestore(arg_0));
        HashMap operatorStates = new HashMap();
        checkpointPlan.fulfillFinishedTaskStatus(operatorStates);
        Assert.assertEquals((long)3L, (long)operatorStates.size());
        Assert.assertTrue((boolean)((OperatorState)operatorStates.get(fullyFinishedOperatorId)).isFullyFinished());
        Assert.assertTrue((boolean)((OperatorState)operatorStates.get(finishedOnRestoreOperatorId)).isFullyFinished());
        OperatorState operatorState = (OperatorState)operatorStates.get(partiallyFinishedOperatorId);
        Assert.assertFalse((boolean)operatorState.isFullyFinished());
        Assert.assertTrue((boolean)operatorState.getState(0).isFinished());
    }

    @Test
    public void testFulfillFullyFinishedStatesWithCoordinator() throws Exception {
        JobVertexID finishedJobVertexID = new JobVertexID();
        OperatorID finishedOperatorID = new OperatorID();
        ExecutionGraph executionGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(finishedJobVertexID, 1, 256, Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)finishedOperatorID)), true).build();
        executionGraph.getJobVertex(finishedJobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().markFinished();
        CheckpointPlan checkpointPlan = this.createCheckpointPlan(executionGraph);
        HashMap<OperatorID, OperatorState> operatorStates = new HashMap<OperatorID, OperatorState>();
        OperatorState operatorState = new OperatorState(finishedOperatorID, 1, 256);
        operatorState.setCoordinatorState((ByteStreamStateHandle)new TestingStreamStateHandle());
        operatorStates.put(finishedOperatorID, operatorState);
        checkpointPlan.fulfillFinishedTaskStatus(operatorStates);
        Assert.assertEquals((long)1L, (long)operatorStates.size());
        Assert.assertTrue((boolean)((OperatorState)operatorStates.get(finishedOperatorID)).isFullyFinished());
    }

    private CheckpointPlan createCheckpointPlan(ExecutionGraph executionGraph) throws Exception {
        DefaultCheckpointPlanCalculator checkpointPlanCalculator = new DefaultCheckpointPlanCalculator(new JobID(), (CheckpointPlanCalculatorContext)new ExecutionGraphCheckpointPlanCalculatorContext(executionGraph), executionGraph.getVerticesTopologically(), true);
        return (CheckpointPlan)checkpointPlanCalculator.calculateCheckpointPlan().get();
    }
}

