package org.apache.flink.runtime.checkpoint;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.class */
public class DefaultSchedulerCheckpointCoordinatorTest extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @Test
    public void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingCheckpointIDCounter createStoreWithShutdownCheckAndNoStartAction = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        DefaultScheduler createSchedulerAndEnableCheckpointing = createSchedulerAndEnableCheckpointing(createStoreWithShutdownCheckAndNoStartAction, TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableFuture2));
        ExecutionGraph executionGraph = createSchedulerAndEnableCheckpointing.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        Assert.assertThat(checkpointCoordinator, Matchers.notNullValue());
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(false));
        executionGraph.failJob(new Exception("Test Exception"), System.currentTimeMillis());
        createSchedulerAndEnableCheckpointing.closeAsync().get();
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(true));
        Assert.assertThat(completableFuture.get(), Matchers.is(JobStatus.FAILED));
        Assert.assertThat(completableFuture2.get(), Matchers.is(JobStatus.FAILED));
    }

    @Test
    public void testClosingSchedulerShutsDownCheckpointCoordinatorOnSuspendedExecutionGraph() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingCheckpointIDCounter createStoreWithShutdownCheckAndNoStartAction = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        DefaultScheduler createSchedulerAndEnableCheckpointing = createSchedulerAndEnableCheckpointing(createStoreWithShutdownCheckAndNoStartAction, TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableFuture2));
        ExecutionGraph executionGraph = createSchedulerAndEnableCheckpointing.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        Assert.assertThat(checkpointCoordinator, Matchers.notNullValue());
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(false));
        executionGraph.suspend(new Exception("Test Exception"));
        createSchedulerAndEnableCheckpointing.closeAsync().get();
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(true));
        Assert.assertThat(completableFuture.get(), Matchers.is(JobStatus.SUSPENDED));
        Assert.assertThat(completableFuture2.get(), Matchers.is(JobStatus.SUSPENDED));
    }

    @Test
    public void testClosingSchedulerShutsDownCheckpointCoordinatorOnFinishedExecutionGraph() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingCheckpointIDCounter createStoreWithShutdownCheckAndNoStartAction = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        DefaultScheduler createSchedulerAndEnableCheckpointing = createSchedulerAndEnableCheckpointing(createStoreWithShutdownCheckAndNoStartAction, TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableFuture2));
        ExecutionGraph executionGraph = createSchedulerAndEnableCheckpointing.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        Assert.assertThat(checkpointCoordinator, Matchers.notNullValue());
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(false));
        createSchedulerAndEnableCheckpointing.startScheduling();
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            createSchedulerAndEnableCheckpointing.updateTaskExecutionState(new TaskExecutionState(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
        }
        Assert.assertThat(executionGraph.getTerminationFuture().get(), Matchers.is(JobStatus.FINISHED));
        createSchedulerAndEnableCheckpointing.closeAsync().get();
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(true));
        Assert.assertThat(completableFuture.get(), Matchers.is(JobStatus.FINISHED));
        Assert.assertThat(completableFuture2.get(), Matchers.is(JobStatus.FINISHED));
    }

    @Test
    public void testClosingSchedulerSuspendsExecutionGraphAndShutsDownCheckpointCoordinator() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingCheckpointIDCounter createStoreWithShutdownCheckAndNoStartAction = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        DefaultScheduler createSchedulerAndEnableCheckpointing = createSchedulerAndEnableCheckpointing(createStoreWithShutdownCheckAndNoStartAction, TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableFuture2));
        ExecutionGraph executionGraph = createSchedulerAndEnableCheckpointing.getExecutionGraph();
        CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        Assert.assertThat(checkpointCoordinator, Matchers.notNullValue());
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(false));
        createSchedulerAndEnableCheckpointing.closeAsync().get();
        Assert.assertThat(executionGraph.getState(), Matchers.is(JobStatus.SUSPENDED));
        Assert.assertThat(Boolean.valueOf(checkpointCoordinator.isShutdown()), Matchers.is(true));
        Assert.assertThat(completableFuture.get(), Matchers.is(JobStatus.SUSPENDED));
        Assert.assertThat(completableFuture2.get(), Matchers.is(JobStatus.SUSPENDED));
    }

    private DefaultScheduler createSchedulerAndEnableCheckpointing(CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore) throws Exception {
        Time days = Time.days(1L);
        JobVertex jobVertex = new JobVertex("MockVertex");
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return new DefaultSchedulerBuilder(JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setJobCheckpointingSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(100L).setCheckpointTimeout(100L).build(), (SerializedValue) null)).build(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(completedCheckpointStore, checkpointIDCounter)).setRpcTimeout(days).build();
    }
}
