/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingJobStatusHook;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.TestFailoverStrategyFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionSlotAssignment;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryTestingUtils;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.TriFunction;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;

public class DefaultSchedulerTest
extends TestLogger {
    private static final int TIMEOUT_MS = 1000;
    private final ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutorService;
    private Configuration configuration;
    private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
    private TestExecutionOperationsDecorator testExecutionOperations;
    private ExecutionVertexVersioner executionVertexVersioner;
    private TestExecutionSlotAllocatorFactory executionSlotAllocatorFactory;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;
    private TestingShuffleMaster shuffleMaster;
    private TestingJobMasterPartitionTracker partitionTracker;
    private Time timeout;

    @BeforeEach
    void setUp() {
        this.executor = Executors.newSingleThreadExecutor();
        this.scheduledExecutorService = new DirectScheduledExecutorService();
        this.configuration = new Configuration();
        this.testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0L);
        this.testExecutionOperations = new TestExecutionOperationsDecorator((ExecutionOperations)new DefaultExecutionOperations());
        this.executionVertexVersioner = new ExecutionVertexVersioner();
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        this.testExecutionSlotAllocator = this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator();
        this.shuffleMaster = new TestingShuffleMaster();
        this.partitionTracker = new TestingJobMasterPartitionTracker();
        this.timeout = Time.seconds((long)60L);
    }

    @AfterEach
    void tearDown() {
        if (this.scheduledExecutorService != null) {
            ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.scheduledExecutorService});
        }
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.executor});
        }
    }

    @Test
    void startScheduling() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        this.createSchedulerAndStartScheduling(jobGraph);
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assertions.assertThat(deployedExecutionVertices).contains((Object[])new ExecutionVertexID[]{executionVertexId});
    }

    @Test
    void testCorrectSettingOfInitializationTimestamp() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionGraphInfo executionGraphInfo = scheduler.requestJob();
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        Assertions.assertThat((long)archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)).isGreaterThan(0L);
        Assertions.assertThat((long)archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)).isGreaterThan(0L);
        Assertions.assertThat((long)archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)).isGreaterThan(0L);
        Assertions.assertThat((long)archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)).isLessThanOrEqualTo(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED));
    }

    @Test
    void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(4);
        JobVertexID onlyJobVertexId = DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        scheduler.startScheduling();
        List<ExecutionVertexID> verticesToSchedule = Arrays.asList(new ExecutionVertexID(onlyJobVertexId, 0), new ExecutionVertexID(onlyJobVertexId, 1), new ExecutionVertexID(onlyJobVertexId, 2), new ExecutionVertexID(onlyJobVertexId, 3));
        schedulingStrategy.schedule(verticesToSchedule);
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).isEmpty();
        this.testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0));
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).isEmpty();
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).hasSize(4);
    }

    @Test
    void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(10);
        JobVertexID onlyJobVertexId = DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID();
        List<ExecutionVertexID> desiredScheduleOrder = Arrays.asList(new ExecutionVertexID(onlyJobVertexId, 4), new ExecutionVertexID(onlyJobVertexId, 0), new ExecutionVertexID(onlyJobVertexId, 3), new ExecutionVertexID(onlyJobVertexId, 1), new ExecutionVertexID(onlyJobVertexId, 2));
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        schedulingStrategy.schedule(desiredScheduleOrder);
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionOperations.getDeployedVertices();
        Assertions.assertThat(desiredScheduleOrder).isEqualTo(deployedExecutionVertices);
    }

    @Test
    void restartAfterDeploymentFails() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        this.testExecutionOperations.enableFailDeploy();
        this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionOperations.disableFailDeploy();
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assertions.assertThat(deployedExecutionVertices).contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId});
    }

    @Test
    void restartFailedTask() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assertions.assertThat(deployedExecutionVertices).contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId});
    }

    @Test
    void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        TaskExecutionState taskExecutionState = SchedulerTestingUtils.createFailedTaskExecutionState(ExecutionGraphTestUtils.createExecutionAttemptId());
        Assertions.assertThat((boolean)scheduler.updateTaskExecutionState(taskExecutionState)).isFalse();
    }

    @Test
    void failJobIfCannotRestart() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        this.waitForTermination(scheduler);
        JobStatus jobStatus = scheduler.requestJobStatus();
        Assertions.assertThat((Comparable)jobStatus).isEqualTo((Object)JobStatus.FAILED);
    }

    @Test
    void failJobIfNotEnoughResources() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionSlotAllocator.timeoutPendingRequests();
        this.waitForTermination(scheduler);
        JobStatus jobStatus = scheduler.requestJobStatus();
        Assertions.assertThat((Comparable)jobStatus).isEqualTo((Object)JobStatus.FAILED);
        Throwable failureCause = scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo().getException().deserializeError(DefaultSchedulerTest.class.getClassLoader());
        Assertions.assertThat((Optional)ExceptionUtils.findThrowable((Throwable)failureCause, NoResourceAvailableException.class)).isPresent();
        Assertions.assertThat((Optional)ExceptionUtils.findThrowableWithMessage((Throwable)failureCause, (String)"Could not allocate the required slot within slot request timeout.")).isPresent();
        Assertions.assertThat((Comparable)jobStatus).isEqualTo((Object)JobStatus.FAILED);
    }

    @Test
    void restartVerticesOnSlotAllocationTimeout() throws Exception {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.testRestartVerticesOnFailuresInScheduling(vid -> this.testExecutionSlotAllocator.timeoutPendingRequest((ExecutionVertexID)vid));
    }

    @Test
    void restartVerticesOnAssignedSlotReleased() throws Exception {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.testRestartVerticesOnFailuresInScheduling(vid -> {
            LogicalSlot slot = this.testExecutionSlotAllocator.completePendingRequest((ExecutionVertexID)vid);
            slot.releaseSlot((Throwable)new Exception("Release slot for test"));
        });
    }

    private void testRestartVerticesOnFailuresInScheduling(Consumer<ExecutionVertexID> actionsToTriggerTaskFailure) throws Exception {
        int parallelism = 2;
        JobVertex v1 = DefaultSchedulerTest.createVertex("vertex1", 2);
        JobVertex v2 = DefaultSchedulerTest.createVertex("vertex2", 2);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory, (FailoverStrategy.Factory)new RestartPipelinedRegionFailoverStrategy.Factory());
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        scheduler.startScheduling();
        ExecutionVertexID vid11 = new ExecutionVertexID(v1.getID(), 0);
        ExecutionVertexID vid12 = new ExecutionVertexID(v1.getID(), 1);
        ExecutionVertexID vid21 = new ExecutionVertexID(v2.getID(), 0);
        ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1);
        schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22));
        Assertions.assertThat(this.testExecutionSlotAllocator.getPendingRequests()).hasSize(4);
        actionsToTriggerTaskFailure.accept(vid11);
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex ev11 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex ev12 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex ev21 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex ev22 = (ArchivedExecutionVertex)vertexIterator.next();
        Assertions.assertThat(this.testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
        Assertions.assertThat((Comparable)ev11.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
        Assertions.assertThat((Comparable)ev21.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
        Assertions.assertThat((Comparable)ev12.getExecutionState()).isEqualTo((Object)ExecutionState.SCHEDULED);
        Assertions.assertThat((Comparable)ev22.getExecutionState()).isEqualTo((Object)ExecutionState.SCHEDULED);
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(schedulingStrategy.getReceivedVerticesToRestart()).contains((Object[])new ExecutionVertexID[]{vid11, vid21});
    }

    @Test
    void skipDeploymentIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        List sortedJobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        ExecutionVertexID sourceExecutionVertexId = new ExecutionVertexID(((JobVertex)sortedJobVertices.get(0)).getID(), 0);
        ExecutionVertexID sinkExecutionVertexId = new ExecutionVertexID(((JobVertex)sortedJobVertices.get(1)).getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionSlotAllocator.completePendingRequest(sourceExecutionVertexId);
        ArchivedExecutionVertex sourceExecutionVertex = (ArchivedExecutionVertex)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator().next();
        ExecutionAttemptID attemptId = sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.enableAutoCompletePendingRequests();
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).contains((Object[])new ExecutionVertexID[]{sourceExecutionVertexId, sinkExecutionVertexId});
        Assertions.assertThat((Comparable)scheduler.requestJob().getArchivedExecutionGraph().getState()).isEqualTo((Object)JobStatus.RUNNING);
    }

    @Test
    void releaseSlotIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID(), 0);
        this.createSchedulerAndStartScheduling(jobGraph);
        this.executionVertexVersioner.recordModification(onlyExecutionVertexId);
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionSlotAllocator.getReturnedSlots()).hasSize(1);
    }

    @Test
    void vertexIsResetBeforeRestarted() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        SchedulingTopology topology = schedulingStrategy.getSchedulingTopology();
        scheduler.startScheduling();
        SchedulingExecutionVertex onlySchedulingVertex = (SchedulingExecutionVertex)Iterables.getOnlyElement((Iterable)topology.getVertices());
        schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId()));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(schedulingStrategy.getReceivedVerticesToRestart()).hasSize(1);
        Assertions.assertThat((Comparable)onlySchedulingVertex.getState()).isEqualTo((Object)ExecutionState.CREATED);
    }

    @Test
    void scheduleOnlyIfVertexIsCreated() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        SchedulingTopology topology = schedulingStrategy.getSchedulingTopology();
        scheduler.startScheduling();
        ExecutionVertexID onlySchedulingVertexId = (ExecutionVertexID)((SchedulingExecutionVertex)Iterables.getOnlyElement((Iterable)topology.getVertices())).getId();
        schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId));
        Assertions.assertThatThrownBy(() -> schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId)), (String)"IllegalStateException should happen", (Object[])new Object[0]).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void handleGlobalFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        scheduler.handleGlobalFailure((Throwable)new Exception("forced failure"));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assertions.assertThat(deployedExecutionVertices).contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId});
    }

    @Test
    void handleGlobalFailureWithLocalFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        List attemptIds = StreamSupport.stream(scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().spliterator(), false).map(ArchivedExecutionVertex::getCurrentExecutionAttempt).map(ArchivedExecution::getAttemptId).collect(Collectors.toList());
        ExecutionAttemptID localFailureAttemptId = (ExecutionAttemptID)attemptIds.get(0);
        scheduler.handleGlobalFailure((Throwable)new Exception("global failure"));
        scheduler.updateTaskExecutionState(new TaskExecutionState(localFailureAttemptId, ExecutionState.FAILED, (Throwable)new Exception("local failure")));
        for (ExecutionAttemptID attemptId : attemptIds) {
            scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED));
        }
        this.taskRestartExecutor.triggerScheduledTasks();
        ExecutionVertexID executionVertexId0 = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        ExecutionVertexID executionVertexId1 = new ExecutionVertexID(onlyJobVertex.getID(), 1);
        ((ListAssert)Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).withFailMessage("The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.", new Object[0])).contains((Object[])new ExecutionVertexID[]{executionVertexId0, executionVertexId1, executionVertexId0, executionVertexId1});
    }

    @Test
    void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
        this.assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(SchedulerBase::startCheckpointScheduler);
    }

    @Test
    void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
        this.assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(SchedulerBase::stopCheckpointScheduler);
    }

    private void assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(Consumer<DefaultScheduler> callSchedulingOperation) {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Assertions.assertThat((Object)scheduler.getCheckpointCoordinator()).isNotNull();
        scheduler.updateTaskExecutionState(new TaskExecutionState(((ExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
        Assertions.assertThat((Object)scheduler.getCheckpointCoordinator()).isNull();
        callSchedulingOperation.accept(scheduler);
        Assertions.assertThat((Object)scheduler.getCheckpointCoordinator()).isNull();
    }

    @Test
    void vertexIsNotAffectedByOutdatedDeployment() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex v1 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex v2 = (ArchivedExecutionVertex)vertexIterator.next();
        SchedulingExecutionVertex sv1 = (SchedulingExecutionVertex)scheduler.getSchedulingTopology().getVertices().iterator().next();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(v1.getCurrentExecutionAttempt().getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(v2.getCurrentExecutionAttempt().getAttemptId()));
        Assertions.assertThat((Comparable)sv1.getState()).isEqualTo((Object)ExecutionState.SCHEDULED);
    }

    @Test
    void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService()));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        this.transitionToRunning(scheduler, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
    }

    @Test
    void restoreStateWhenRestartingTasks() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService()));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        this.transitionToRunning(scheduler, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        TestMasterHook masterHook = TestMasterHook.fromId("testHook");
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)masterHook);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(scheduler, checkpointId);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat((int)masterHook.getRestoreCount()).isOne();
    }

    @Test
    void testTriggerCheckpointAndCompletedAfterStore() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        TestingCompletedCheckpointStore store = TestingCompletedCheckpointStore.builder().withGetAllCheckpointsSupplier(Collections::emptyList).withAddCheckpointAndSubsumeOldestOneFunction((TriFunction<CompletedCheckpoint, CheckpointsCleaner, Runnable, CompletedCheckpoint>)((TriFunction)(ignoredCompletedCheckpoint, ignoredCheckpointsCleaner, ignoredPostCleanup) -> {
            throw new RuntimeException("Throw exception when add checkpoint to store.");
        })).withGetSharedStateRegistrySupplier(SharedStateRegistryImpl::new).build();
        ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService());
        DefaultScheduler scheduler = this.createSchedulerBuilder(jobGraph, mainThreadExecutor).setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(store, (CheckpointIDCounter)new StandaloneCheckpointIDCounter())).build();
        mainThreadExecutor.execute(() -> ((DefaultScheduler)scheduler).startScheduling());
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        this.transitionToRunning(scheduler, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        CompletableFuture checkpointCompletableFuture = checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        OneShotLatch latch = new OneShotLatch();
        this.executor.execute(() -> {
            try {
                AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobGraph.getJobID(), attemptId, checkpointId);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
            }
            catch (Exception e) {
                latch.trigger();
            }
        });
        latch.await();
        Assertions.assertThat((CompletableFuture)checkpointCompletableFuture).isCompletedExceptionally();
    }

    @Test
    void failGlobalWhenRestoringStateFails() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService()));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        this.transitionToRunning(scheduler, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        TestMasterHook masterHook = TestMasterHook.fromId("testHook");
        masterHook.enableFailOnRestore();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)masterHook);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(scheduler, checkpointId);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assertions.assertThat(deployedExecutionVertices).contains((Object[])new ExecutionVertexID[]{executionVertexId});
        masterHook.disableFailOnRestore();
        this.taskRestartExecutor.triggerScheduledTasks();
        deployedExecutionVertices = this.testExecutionOperations.getDeployedVertices();
        Assertions.assertThat(deployedExecutionVertices).contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId});
    }

    @Test
    void failJobWillIncrementVertexVersions() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.failJob((Throwable)new FlinkException("Test failure."), System.currentTimeMillis());
        Assertions.assertThat((boolean)this.executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
    }

    @Test
    void cancelJobWillIncrementVertexVersions() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.cancel();
        Assertions.assertThat((boolean)this.executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
    }

    @Test
    void suspendJobWillIncrementVertexVersions() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.close();
        Assertions.assertThat((boolean)this.executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
    }

    @Test
    void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId1 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId1, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        JobStatus jobStatusAfterFirstFailure = scheduler.requestJobStatus();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId2, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus jobStatusWithPendingRestarts = scheduler.requestJobStatus();
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus jobStatusAfterRestarts = scheduler.requestJobStatus();
        Assertions.assertThat((Comparable)jobStatusAfterFirstFailure).isEqualTo((Object)JobStatus.RESTARTING);
        Assertions.assertThat((Comparable)jobStatusWithPendingRestarts).isEqualTo((Object)JobStatus.RESTARTING);
        Assertions.assertThat((Comparable)jobStatusAfterRestarts).isEqualTo((Object)JobStatus.RUNNING);
    }

    @Test
    void cancelWhileRestartingShouldWaitForRunningTasks() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        SchedulingTopology topology = scheduler.getSchedulingTopology();
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId1 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionVertexID executionVertex2 = attemptId2.getExecutionVertexId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId1, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        scheduler.cancel();
        ExecutionState vertex2StateAfterCancel = topology.getVertex(executionVertex2).getState();
        JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId2, ExecutionState.CANCELED, (Throwable)new RuntimeException("expected")));
        Assertions.assertThat((Comparable)vertex2StateAfterCancel).isEqualTo((Object)ExecutionState.CANCELING);
        Assertions.assertThat((Comparable)statusAfterCancelWhileRestarting).isEqualTo((Object)JobStatus.CANCELLING);
        Assertions.assertThat((Comparable)scheduler.requestJobStatus()).isEqualTo((Object)JobStatus.CANCELED);
    }

    @Test
    void failureInfoIsSetAfterTaskFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        ErrorInfo failureInfo = scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
        Assertions.assertThat((Object)failureInfo).isNotNull();
        Assertions.assertThat((String)failureInfo.getExceptionAsString()).contains(new CharSequence[]{"expected exception"});
    }

    @Test
    void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory(), (FailoverStrategy.Factory)new RestartAllFailoverStrategy.Factory());
        scheduler.startScheduling();
        Iterator vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex v1 = (ArchivedExecutionVertex)vertexIterator.next();
        Assertions.assertThat(this.testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(v1.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        vertexIterator = scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        v1 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex v2 = (ArchivedExecutionVertex)vertexIterator.next();
        Assertions.assertThat((Comparable)v1.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
        Assertions.assertThat((Comparable)v2.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
        Assertions.assertThat(this.testExecutionSlotAllocator.getPendingRequests()).isEmpty();
    }

    @Test
    void pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots() throws Exception {
        int parallelism = 10;
        JobGraph jobGraph = DefaultSchedulerTest.sourceSinkJobGraph(10);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.testExecutionSlotAllocator.enableCompletePendingRequestsWithReturnedSlots();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory(), (FailoverStrategy.Factory)new RestartAllFailoverStrategy.Factory());
        scheduler.startScheduling();
        ExecutionVertex ev1 = (ExecutionVertex)Iterables.get((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices(), (int)0);
        Set pendingLogicalSlotFutures = this.testExecutionSlotAllocator.getPendingRequests().values().stream().map(ExecutionSlotAssignment::getLogicalSlotFuture).collect(Collectors.toSet());
        Assertions.assertThat(pendingLogicalSlotFutures).hasSize(20);
        this.testExecutionSlotAllocator.completePendingRequest(ev1.getID());
        Assertions.assertThat((long)pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count()).isEqualTo(1L);
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(ev1.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        Assertions.assertThat(this.testExecutionSlotAllocator.getPendingRequests()).isEmpty();
        Assertions.assertThat(this.testExecutionSlotAllocator.getReturnedSlots()).hasSize(2);
        Assertions.assertThat((long)pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count()).isEqualTo(18L);
    }

    @Test
    void testExceptionHistoryWithGlobalFailOver() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        Exception expectedException = new Exception("Expected exception");
        scheduler.handleGlobalFailure((Throwable)expectedException);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED, (Throwable)expectedException));
        this.taskRestartExecutor.triggerScheduledTasks();
        Iterable actualExceptionHistory = scheduler.getExceptionHistory();
        Assertions.assertThat((Iterable)actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = (RootExceptionHistoryEntry)actualExceptionHistory.iterator().next();
        Assertions.assertThat((boolean)ExceptionHistoryEntryTestingUtils.matchesGlobalFailure((ExceptionHistoryEntry)failure, expectedException, scheduler.getExecutionGraph().getFailureInfo().getTimestamp())).isTrue();
        Assertions.assertThat((Iterable)failure.getConcurrentExceptions()).isEmpty();
    }

    @Test
    void testExceptionHistoryWithRestartableFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingLogicalSlotBuilder logicalSlotBuilder = new TestingLogicalSlotBuilder();
        logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation);
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(logicalSlotBuilder);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex taskFailureExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        RuntimeException restartableException = new RuntimeException("restartable exception");
        long updateStateTriggeringRestartTimestamp = DefaultSchedulerTest.initiateFailure(scheduler, taskFailureExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), restartableException);
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        ExecutionAttemptID failingAttemptId = ((ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        RuntimeException failingException = new RuntimeException("failing exception");
        long updateStateTriggeringJobFailureTimestamp = DefaultSchedulerTest.initiateFailure(scheduler, failingAttemptId, failingException);
        Iterable actualExceptionHistory = scheduler.getExceptionHistory();
        Assertions.assertThat((Iterable)actualExceptionHistory).hasSize(2);
        Iterator iterator = actualExceptionHistory.iterator();
        RootExceptionHistoryEntry entry0 = (RootExceptionHistoryEntry)iterator.next();
        Assertions.assertThat((boolean)ExceptionHistoryEntryTestingUtils.matchesFailure((ExceptionHistoryEntry)entry0, restartableException, updateStateTriggeringRestartTimestamp, taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), taskFailureExecutionVertex.getCurrentAssignedResourceLocation())).isTrue();
        RootExceptionHistoryEntry entry1 = (RootExceptionHistoryEntry)iterator.next();
        Assertions.assertThat((boolean)ExceptionHistoryEntryTestingUtils.matchesGlobalFailure((ExceptionHistoryEntry)entry1, failingException, updateStateTriggeringJobFailureTimestamp)).isTrue();
    }

    @Test
    void testExceptionHistoryWithPreDeployFailure() {
        this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator().disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(DefaultSchedulerTest.singleNonParallelJobVertexJobGraph());
        this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator().timeoutPendingRequests();
        ArchivedExecutionVertex taskFailureExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        Assertions.assertThat((Comparable)taskFailureExecutionVertex.getCurrentAssignedResourceLocation()).isNull();
        ErrorInfo failureInfo = (ErrorInfo)taskFailureExecutionVertex.getFailureInfo().orElseThrow(() -> new AssertionError((Object)"A failureInfo should be set."));
        Iterable actualExceptionHistory = scheduler.getExceptionHistory();
        Assertions.assertThat((Iterable)actualExceptionHistory).anySatisfy(e -> ExceptionHistoryEntryTestingUtils.matchesFailure((ExceptionHistoryEntry)e, (Throwable)failureInfo.getException(), failureInfo.getTimestamp(), taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), taskFailureExecutionVertex.getCurrentAssignedResourceLocation()));
    }

    @Test
    void testExceptionHistoryConcurrentRestart() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingLogicalSlotBuilder logicalSlotBuilder = new TestingLogicalSlotBuilder();
        logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation);
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(logicalSlotBuilder);
        ReorganizableManuallyTriggeredScheduledExecutor delayExecutor = new ReorganizableManuallyTriggeredScheduledExecutor();
        TestFailoverStrategyFactory failoverStrategyFactory = new TestFailoverStrategyFactory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory(), failoverStrategyFactory, (ScheduledExecutor)delayExecutor);
        scheduler.startScheduling();
        ExecutionVertex executionVertex0 = (ExecutionVertex)Iterables.get((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices(), (int)0);
        ExecutionVertex executionVertex1 = (ExecutionVertex)Iterables.get((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices(), (int)1);
        RuntimeException exception0 = new RuntimeException("failure #0");
        failoverStrategyFactory.setTasksToRestart(executionVertex0.getID());
        long updateStateTriggeringRestartTimestamp0 = DefaultSchedulerTest.initiateFailure(scheduler, executionVertex0.getCurrentExecutionAttempt().getAttemptId(), exception0);
        RuntimeException exception1 = new RuntimeException("failure #1");
        failoverStrategyFactory.setTasksToRestart(executionVertex1.getID(), executionVertex0.getID());
        long updateStateTriggeringRestartTimestamp1 = DefaultSchedulerTest.initiateFailure(scheduler, executionVertex1.getCurrentExecutionAttempt().getAttemptId(), exception1);
        Collections.reverse(delayExecutor.getCollectedScheduledTasks());
        delayExecutor.triggerNonPeriodicScheduledTasks();
        Assertions.assertThat((Iterable)scheduler.getExceptionHistory()).hasSize(2);
        Iterator actualExceptionHistory = scheduler.getExceptionHistory().iterator();
        RootExceptionHistoryEntry entry0 = (RootExceptionHistoryEntry)actualExceptionHistory.next();
        Assertions.assertThat((boolean)ExceptionHistoryEntryTestingUtils.matchesFailure((ExceptionHistoryEntry)entry0, exception0, updateStateTriggeringRestartTimestamp0, executionVertex0.getTaskNameWithSubtaskIndex(), executionVertex0.getCurrentAssignedResourceLocation())).isTrue();
        Assertions.assertThat((Iterable)entry0.getConcurrentExceptions()).anySatisfy(e -> ExceptionHistoryEntryTestingUtils.matchesFailure(e, exception1, updateStateTriggeringRestartTimestamp1, executionVertex1.getTaskNameWithSubtaskIndex(), executionVertex1.getCurrentAssignedResourceLocation()));
        RootExceptionHistoryEntry entry1 = (RootExceptionHistoryEntry)actualExceptionHistory.next();
        Assertions.assertThat((boolean)ExceptionHistoryEntryTestingUtils.matchesFailure((ExceptionHistoryEntry)entry1, exception1, updateStateTriggeringRestartTimestamp1, executionVertex1.getTaskNameWithSubtaskIndex(), executionVertex1.getCurrentAssignedResourceLocation())).isTrue();
        Assertions.assertThat((Iterable)entry1.getConcurrentExceptions()).isEmpty();
    }

    @Test
    void testExceptionHistoryTruncation() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.configuration.set(WebOptions.MAX_EXCEPTION_HISTORY_SIZE, (Object)1);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionAttemptID attemptId0 = ((ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        DefaultSchedulerTest.initiateFailure(scheduler, attemptId0, new RuntimeException("old exception"));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTasks();
        ArchivedExecutionVertex executionVertex1 = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        RuntimeException exception = new RuntimeException("relevant exception");
        long relevantTimestamp = DefaultSchedulerTest.initiateFailure(scheduler, executionVertex1.getCurrentExecutionAttempt().getAttemptId(), exception);
        this.taskRestartExecutor.triggerNonPeriodicScheduledTasks();
        Assertions.assertThat((Iterable)scheduler.getExceptionHistory()).anySatisfy(e -> ExceptionHistoryEntryTestingUtils.matchesFailure((ExceptionHistoryEntry)e, exception, relevantTimestamp, executionVertex1.getTaskNameWithSubtaskIndex(), executionVertex1.getCurrentAssignedResourceLocation()));
    }

    @Test
    void testStatusMetrics() throws Exception {
        CompletableFuture runningTimeMetricFuture = new CompletableFuture();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((metric, name, group) -> {
            switch (name) {
                case "runningTimeTotal": {
                    runningTimeMetricFuture.complete((Gauge)metric);
                }
            }
        }).build();
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        Configuration configuration = new Configuration();
        configuration.set(MetricOptions.JOB_STATUS_METRICS, Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
        ComponentMainThreadExecutor singleThreadMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        Time slotTimeout = Time.milliseconds((long)5L);
        DeclarativeSlotPoolBridge slotPool = new DeclarativeSlotPoolBridgeBuilder().setBatchSlotTimeout(slotTimeout).buildAndStart(singleThreadMainThreadExecutor);
        PhysicalSlotProviderImpl slotProvider = new PhysicalSlotProviderImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), (SlotPool)slotPool);
        DefaultScheduler scheduler = this.createSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)metricRegistry, (String)"localhost").addJob(new JobID(), "jobName")).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory((PhysicalSlotProvider)slotProvider, slotTimeout)).build();
        AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway taskManagerGateway = new AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway(1);
        taskManagerGateway.setCancelConsumer(executionAttemptId -> singleThreadMainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.CANCELED))));
        singleThreadMainThreadExecutor.execute(() -> DefaultSchedulerTest.lambda$testStatusMetrics$12(scheduler, (SlotPool)slotPool, taskManagerGateway));
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Thread.sleep(10L);
        Gauge runningTimeGauge = (Gauge)runningTimeMetricFuture.get();
        Assertions.assertThat((Long)((Long)runningTimeGauge.getValue())).isGreaterThan(0L);
    }

    @Test
    void testDeploymentWaitForProducedPartitionRegistration() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList trackedPartitions = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> trackedPartitions.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID()));
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        this.createSchedulerAndStartScheduling(jobGraph);
        Assertions.assertThat(trackedPartitions).isEmpty();
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).isEmpty();
        this.shuffleMaster.completeAllPendingRegistrations();
        Assertions.assertThat(trackedPartitions).hasSize(1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).hasSize(2);
    }

    @Test
    void testFailedProducedPartitionRegistration() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        this.createSchedulerAndStartScheduling(jobGraph);
        Assertions.assertThat(this.testExecutionOperations.getCanceledVertices()).isEmpty();
        Assertions.assertThat(this.testExecutionOperations.getFailedVertices()).isEmpty();
        this.shuffleMaster.failAllPendingRegistrations();
        Assertions.assertThat(this.testExecutionOperations.getCanceledVertices()).hasSize(2);
        Assertions.assertThat(this.testExecutionOperations.getFailedVertices()).hasSize(1);
    }

    @Test
    void testDirectExceptionOnProducedPartitionRegistration() {
        this.shuffleMaster.setThrowExceptionalOnRegistration(true);
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        this.createSchedulerAndStartScheduling(jobGraph);
        Assertions.assertThat(this.testExecutionOperations.getCanceledVertices()).hasSize(2);
        Assertions.assertThat(this.testExecutionOperations.getFailedVertices()).hasSize(1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testProducedPartitionRegistrationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = null;
        try {
            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.shuffleMaster.setAutoCompleteRegistration(false);
            JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
            this.timeout = Time.milliseconds((long)1L);
            this.createSchedulerAndStartScheduling(jobGraph, mainThreadExecutor);
            this.testExecutionOperations.awaitCanceledExecutions(2);
            this.testExecutionOperations.awaitFailedExecutions(1);
        }
        finally {
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
        }
    }

    @Test
    void testLateRegisteredPartitionsWillBeReleased() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList trackedPartitions = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> trackedPartitions.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID()));
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex sourceExecutionVertex = (ArchivedExecutionVertex)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator().next();
        ExecutionAttemptID attemptId = sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId));
        this.shuffleMaster.completeAllPendingRegistrations();
        Assertions.assertThat(trackedPartitions).isEmpty();
        Assertions.assertThat(this.shuffleMaster.getExternallyReleasedPartitions()).hasSize(1);
    }

    @Test
    void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        try {
            DefaultSchedulerTest.doTestCheckpointCleanerIsClosedAfterCheckpointServices((checkpointRecoveryFactory, checkpointCleaner) -> {
                JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(1);
                SchedulerTestingUtils.enableCheckpointing(jobGraph);
                try {
                    return new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(executorService), executorService).setCheckpointRecoveryFactory((CheckpointRecoveryFactory)checkpointRecoveryFactory).setCheckpointCleaner((CheckpointsCleaner)checkpointCleaner).build();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, executorService, this.log);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    @Test
    void testJobStatusHookWithJobFailed() throws Exception {
        this.commonJobStatusHookTest(ExecutionState.FAILED, JobStatus.FAILED);
    }

    @Test
    void testJobStatusHookWithJobCanceled() throws Exception {
        this.commonJobStatusHookTest(ExecutionState.CANCELED, JobStatus.CANCELED);
    }

    @Test
    void testJobStatusHookWithJobFinished() throws Exception {
        this.commonJobStatusHookTest(ExecutionState.FINISHED, JobStatus.FINISHED);
    }

    private void commonJobStatusHookTest(ExecutionState expectedExecutionState, JobStatus expectedJobStatus) throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        TestingJobStatusHook jobStatusHook = new TestingJobStatusHook();
        LinkedList onCreatedJobList = new LinkedList();
        jobStatusHook.setOnCreatedConsumer(jobId -> onCreatedJobList.add(jobId));
        LinkedList onJobStatusList = new LinkedList();
        switch (expectedJobStatus) {
            case FAILED: {
                jobStatusHook.setOnFailedConsumer((jobID, throwable) -> onJobStatusList.add(jobID));
                break;
            }
            case CANCELED: {
                jobStatusHook.setOnCanceledConsumer(jobID -> onJobStatusList.add(jobID));
                break;
            }
            case FINISHED: {
                jobStatusHook.setOnFinishedConsumer(jobID -> onJobStatusList.add(jobID));
                break;
            }
            default: {
                throw new UnsupportedOperationException("JobStatusHook test is not supported: " + expectedJobStatus);
            }
        }
        ArrayList<TestingJobStatusHook> jobStatusHooks = new ArrayList<TestingJobStatusHook>();
        jobStatusHooks.add(jobStatusHook);
        jobGraph.setJobStatusHooks(jobStatusHooks);
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        if (JobStatus.CANCELED == expectedJobStatus) {
            scheduler.cancel();
        }
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, expectedExecutionState));
        this.taskRestartExecutor.triggerScheduledTasks();
        this.waitForTermination(scheduler);
        JobStatus jobStatus = scheduler.requestJobStatus();
        Assertions.assertThat((Comparable)jobStatus).isEqualTo((Object)expectedJobStatus);
        ((ObjectAssert)Assertions.assertThat(onCreatedJobList).singleElement()).isEqualTo((Object)jobGraph.getJobID());
        ((ObjectAssert)Assertions.assertThat(onCreatedJobList).singleElement()).isEqualTo((Object)jobGraph.getJobID());
    }

    public static void doTestCheckpointCleanerIsClosedAfterCheckpointServices(BiFunction<CheckpointRecoveryFactory, CheckpointsCleaner, SchedulerNG> schedulerFactory, ScheduledExecutorService executorService, final Logger logger) throws Exception {
        final CountDownLatch checkpointServicesShutdownBlocked = new CountDownLatch(1);
        final CountDownLatch cleanerClosed = new CountDownLatch(1);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1){

            public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
                checkpointServicesShutdownBlocked.await();
                super.shutdown(jobStatus, checkpointsCleaner);
            }
        };
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(){

            public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
                try {
                    checkpointServicesShutdownBlocked.await();
                }
                catch (InterruptedException e) {
                    logger.error("An error occurred while executing waiting for the CheckpointServices shutdown.", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
                return super.shutdown(jobStatus);
            }
        };
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(){

            public synchronized CompletableFuture<Void> closeAsync() {
                cleanerClosed.countDown();
                return super.closeAsync();
            }
        };
        SchedulerNG scheduler = schedulerFactory.apply(new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)checkpointIDCounter), checkpointsCleaner);
        CompletableFuture schedulerClosed = new CompletableFuture();
        CountDownLatch schedulerClosing = new CountDownLatch(1);
        executorService.submit(() -> {
            scheduler.closeAsync().thenRun(() -> schedulerClosed.complete(null));
            schedulerClosing.countDown();
        });
        schedulerClosing.await();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)cleanerClosed.await(10L, TimeUnit.MILLISECONDS)).withFailMessage("CheckpointCleaner should not close before checkpoint services.", new Object[0])).isFalse();
        checkpointServicesShutdownBlocked.countDown();
        cleanerClosed.await();
        schedulerClosed.get();
    }

    private static long initiateFailure(DefaultScheduler scheduler, ExecutionAttemptID executionAttemptId, Throwable exception) {
        scheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.FAILED, exception));
        return DefaultSchedulerTest.getFailureTimestamp(scheduler, executionAttemptId);
    }

    private static long getFailureTimestamp(DefaultScheduler scheduler, ExecutionAttemptID executionAttemptId) {
        ExecutionVertex failedExecutionVertex = StreamSupport.stream(scheduler.getExecutionGraph().getAllExecutionVertices().spliterator(), false).filter(v -> executionAttemptId.equals((Object)v.getCurrentExecutionAttempt().getAttemptId())).findFirst().orElseThrow(() -> new IllegalArgumentException("No ExecutionVertex available for the passed ExecutionAttemptId " + executionAttemptId));
        return failedExecutionVertex.getFailureInfo().map(ErrorInfo::getTimestamp).orElseThrow(() -> new IllegalStateException("No failure was set for ExecutionVertex having the passed execution " + executionAttemptId));
    }

    private static JobVertex createVertex(String name, int parallelism) {
        JobVertex v = new JobVertex(name);
        v.setParallelism(parallelism);
        v.setInvokableClass(AbstractInvokable.class);
        return v;
    }

    private void waitForTermination(DefaultScheduler scheduler) throws Exception {
        scheduler.getJobTerminationFuture().get(1000L, TimeUnit.MILLISECONDS);
    }

    public static JobGraph singleNonParallelJobVertexJobGraph() {
        return DefaultSchedulerTest.singleJobVertexJobGraph(1);
    }

    private static JobGraph singleJobVertexJobGraph(int parallelism) {
        JobVertex vertex = new JobVertex("source");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        return JobGraphTestUtils.streamingJobGraph(vertex);
    }

    private static JobGraph nonParallelSourceSinkJobGraph() {
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(source, sink);
    }

    private static JobGraph sourceSinkJobGraph(int parallelism) {
        JobVertex source = new JobVertex("source");
        source.setParallelism(parallelism);
        source.setInvokableClass(NoOpInvokable.class);
        JobVertex sink = new JobVertex("sink");
        sink.setParallelism(parallelism);
        sink.setInvokableClass(NoOpInvokable.class);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(source, sink);
    }

    private static JobVertex getOnlyJobVertex(JobGraph jobGraph) {
        List sortedVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Preconditions.checkState((sortedVertices.size() == 1 ? 1 : 0) != 0);
        return (JobVertex)sortedVertices.get(0);
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph) {
        return this.createSchedulerAndStartScheduling(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
        try {
            DefaultScheduler scheduler = this.createSchedulerBuilder(jobGraph, mainThreadExecutor).build();
            mainThreadExecutor.execute(() -> ((DefaultScheduler)scheduler).startScheduling());
            return scheduler;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
        return this.createSchedulerBuilder(jobGraph, mainThreadExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).build();
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory) throws Exception {
        return this.createSchedulerBuilder(jobGraph, mainThreadExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).setFailoverStrategyFactory(failoverStrategyFactory).build();
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory, ScheduledExecutor delayExecutor) throws Exception {
        return this.createSchedulerBuilder(jobGraph, mainThreadExecutor).setDelayExecutor(delayExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).setFailoverStrategyFactory(failoverStrategyFactory).build();
    }

    private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, this.executor, this.scheduledExecutorService, (ScheduledExecutor)this.taskRestartExecutor).setLogger(this.log).setJobMasterConfiguration(this.configuration).setSchedulingStrategyFactory((SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory()).setFailoverStrategyFactory((FailoverStrategy.Factory)new RestartPipelinedRegionFailoverStrategy.Factory()).setRestartBackoffTimeStrategy(this.testRestartBackoffTimeStrategy).setExecutionOperations(this.testExecutionOperations).setExecutionVertexVersioner(this.executionVertexVersioner).setExecutionSlotAllocatorFactory(this.executionSlotAllocatorFactory).setShuffleMaster(this.shuffleMaster).setPartitionTracker(this.partitionTracker).setRpcTimeout(this.timeout);
    }

    private CountDownLatch getCheckpointTriggeredLatch() {
        CountDownLatch checkpointTriggeredLatch = new CountDownLatch(1);
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.testExecutionSlotAllocator.getLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway);
        taskManagerGateway.setCheckpointConsumer((executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> checkpointTriggeredLatch.countDown());
        return checkpointTriggeredLatch;
    }

    private void transitionToRunning(DefaultScheduler scheduler, ExecutionAttemptID attemptId) {
        Preconditions.checkState((boolean)scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.INITIALIZING)));
        Preconditions.checkState((boolean)scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.RUNNING)));
    }

    private static /* synthetic */ void lambda$testStatusMetrics$12(DefaultScheduler scheduler, SlotPool slotPool, AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway taskManagerGateway) {
        scheduler.startScheduling();
        SlotPoolTestUtils.offerSlots(slotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
    }

    private static class ReorganizableManuallyTriggeredScheduledExecutor
    extends ManuallyTriggeredScheduledExecutor {
        private final List<ScheduledTask<?>> scheduledTasks = new ArrayList();

        private ReorganizableManuallyTriggeredScheduledExecutor() {
        }

        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return this.schedule(() -> {
                command.run();
                return null;
            }, delay, unit);
        }

        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            ScheduledTask scheduledTask = new ScheduledTask(callable, unit.convert(delay, TimeUnit.MILLISECONDS));
            this.scheduledTasks.add(scheduledTask);
            return scheduledTask;
        }

        public List<ScheduledTask<?>> getCollectedScheduledTasks() {
            return this.scheduledTasks;
        }

        void scheduleCollectedScheduledTasks() {
            for (ScheduledTask<?> scheduledTask : this.scheduledTasks) {
                super.schedule(scheduledTask.getCallable(), scheduledTask.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            }
            this.scheduledTasks.clear();
        }

        public void triggerNonPeriodicScheduledTask() {
            this.scheduleCollectedScheduledTasks();
            super.triggerNonPeriodicScheduledTask();
        }

        public void triggerNonPeriodicScheduledTasks() {
            this.scheduleCollectedScheduledTasks();
            super.triggerNonPeriodicScheduledTasks();
        }
    }
}

