/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.TestExecutionVertexOperationsDecorator;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultSchedulerTest
extends TestLogger {
    private static final int TIMEOUT_MS = 1000;
    private static final JobID TEST_JOB_ID = new JobID();
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutorService;
    private Configuration configuration;
    private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
    private TestExecutionVertexOperationsDecorator testExecutionVertexOperations;
    private ExecutionVertexVersioner executionVertexVersioner;
    private TestExecutionSlotAllocatorFactory executionSlotAllocatorFactory;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;

    @Before
    public void setUp() throws Exception {
        this.executor = Executors.newSingleThreadExecutor();
        this.scheduledExecutorService = new DirectScheduledExecutorService();
        this.configuration = new Configuration();
        this.testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0L);
        this.testExecutionVertexOperations = new TestExecutionVertexOperationsDecorator((ExecutionVertexOperations)new DefaultExecutionVertexOperations());
        this.executionVertexVersioner = new ExecutionVertexVersioner();
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        this.testExecutionSlotAllocator = this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator();
    }

    @After
    public void tearDown() throws Exception {
        if (this.scheduledExecutorService != null) {
            ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.scheduledExecutorService});
        }
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.executor});
        }
    }

    @Test
    public void startScheduling() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        this.createSchedulerAndStartScheduling(jobGraph);
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId}));
    }

    @Test
    public void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(10);
        JobVertexID onlyJobVertexId = DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID();
        List<ExecutionVertexID> desiredScheduleOrder = Arrays.asList(new ExecutionVertexID(onlyJobVertexId, 4), new ExecutionVertexID(onlyJobVertexId, 0), new ExecutionVertexID(onlyJobVertexId, 3), new ExecutionVertexID(onlyJobVertexId, 1), new ExecutionVertexID(onlyJobVertexId, 2));
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        this.createScheduler(jobGraph, schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        schedulingStrategy.schedule(desiredScheduleOrder);
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        Assert.assertEquals(desiredScheduleOrder, deployedExecutionVertices);
    }

    @Test
    public void restartAfterDeploymentFails() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        this.testExecutionVertexOperations.enableFailDeploy();
        this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionVertexOperations.disableFailDeploy();
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void scheduleWithLazyStrategy() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        this.createSchedulerAndStartScheduling(jobGraph);
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId}));
    }

    @Test
    public void restartFailedTask() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        TaskExecutionState taskExecutionState = new TaskExecutionState(jobGraph.getJobID(), new ExecutionAttemptID(), ExecutionState.FAILED);
        Assert.assertFalse((boolean)scheduler.updateTaskExecutionState(taskExecutionState));
    }

    @Test
    public void failJobIfCannotRestart() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        this.waitForTermination(scheduler);
        JobStatus jobStatus = scheduler.requestJobStatus();
        Assert.assertThat((Object)jobStatus, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.FAILED)));
    }

    @Test
    public void failJobIfNotEnoughResources() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionSlotAllocator.timeoutPendingRequests();
        this.waitForTermination(scheduler);
        JobStatus jobStatus = scheduler.requestJobStatus();
        Assert.assertThat((Object)jobStatus, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.FAILED)));
        Throwable failureCause = scheduler.requestJob().getFailureInfo().getException().deserializeError(DefaultSchedulerTest.class.getClassLoader());
        Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)failureCause, NoResourceAvailableException.class).isPresent());
        Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)failureCause, (String)"Could not allocate the required slot within slot request timeout.").isPresent());
        Assert.assertThat((Object)jobStatus, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.FAILED)));
    }

    @Test
    public void skipDeploymentIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        List sortedJobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        ExecutionVertexID sourceExecutionVertexId = new ExecutionVertexID(((JobVertex)sortedJobVertices.get(0)).getID(), 0);
        ExecutionVertexID sinkExecutionVertexId = new ExecutionVertexID(((JobVertex)sortedJobVertices.get(1)).getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        this.testExecutionSlotAllocator.completePendingRequest(sourceExecutionVertexId);
        ArchivedExecutionVertex sourceExecutionVertex = (ArchivedExecutionVertex)scheduler.requestJob().getAllExecutionVertices().iterator().next();
        ExecutionAttemptID attemptId = sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.enableAutoCompletePendingRequests();
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{sourceExecutionVertexId, sinkExecutionVertexId}));
        Assert.assertThat((Object)scheduler.requestJob().getState(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.RUNNING)));
    }

    @Test
    public void releaseSlotIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(DefaultSchedulerTest.getOnlyJobVertex(jobGraph).getID(), 0);
        this.createSchedulerAndStartScheduling(jobGraph);
        this.executionVertexVersioner.recordModification(onlyExecutionVertexId);
        this.testExecutionSlotAllocator.completePendingRequests();
        Assert.assertThat(this.testExecutionSlotAllocator.getReturnedSlots(), (Matcher)Matchers.hasSize((int)1));
    }

    @Test
    public void vertexIsResetBeforeRestarted() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        SchedulingTopology topology = schedulingStrategy.getSchedulingTopology();
        this.startScheduling((SchedulerNG)scheduler);
        SchedulingExecutionVertex onlySchedulingVertex = (SchedulingExecutionVertex)Iterables.getOnlyElement((Iterable)topology.getVertices());
        schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId()));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(schedulingStrategy.getReceivedVerticesToRestart(), (Matcher)Matchers.hasSize((int)1));
        Assert.assertThat((Object)onlySchedulingVertex.getState(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)ExecutionState.CREATED)));
    }

    @Test
    public void scheduleOnlyIfVertexIsCreated() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
        DefaultScheduler scheduler = this.createScheduler(jobGraph, schedulingStrategyFactory);
        TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
        SchedulingTopology topology = schedulingStrategy.getSchedulingTopology();
        this.startScheduling((SchedulerNG)scheduler);
        ExecutionVertexID onlySchedulingVertexId = (ExecutionVertexID)((SchedulingExecutionVertex)Iterables.getOnlyElement((Iterable)topology.getVertices())).getId();
        schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId));
        try {
            schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId));
            Assert.fail((String)"IllegalStateException should happen");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void handleGlobalFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        scheduler.handleGlobalFailure((Throwable)new Exception("forced failure"));
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.CANCELED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void handleGlobalFailureWithLocalFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        List attemptIds = StreamSupport.stream(scheduler.requestJob().getAllExecutionVertices().spliterator(), false).map(ArchivedExecutionVertex::getCurrentExecutionAttempt).map(ArchivedExecution::getAttemptId).collect(Collectors.toList());
        ExecutionAttemptID localFailureAttemptId = (ExecutionAttemptID)attemptIds.get(0);
        scheduler.handleGlobalFailure((Throwable)new Exception("global failure"));
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), localFailureAttemptId, ExecutionState.FAILED, (Throwable)new Exception("local failure")));
        for (ExecutionAttemptID attemptId : attemptIds) {
            scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.CANCELED));
        }
        this.taskRestartExecutor.triggerScheduledTasks();
        ExecutionVertexID executionVertexId0 = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        ExecutionVertexID executionVertexId1 = new ExecutionVertexID(onlyJobVertex.getID(), 1);
        Assert.assertThat((String)"The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.", this.testExecutionVertexOperations.getDeployedVertices(), (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId0, executionVertexId1, executionVertexId0, executionVertexId1}));
    }

    @Test
    public void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
        this.assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(SchedulerBase::startCheckpointScheduler);
    }

    @Test
    public void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
        this.assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(SchedulerBase::stopCheckpointScheduler);
    }

    private void assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(Consumer<DefaultScheduler> callSchedulingOperation) {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Assert.assertThat((Object)scheduler.getCheckpointCoordinator(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), ((ExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
        Assert.assertThat((Object)scheduler.getCheckpointCoordinator(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        callSchedulingOperation.accept(scheduler);
        Assert.assertThat((Object)scheduler.getCheckpointCoordinator(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void vertexIsNotAffectedByOutdatedDeployment() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Iterator vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex v1 = (ArchivedExecutionVertex)vertexIterator.next();
        ArchivedExecutionVertex v2 = (ArchivedExecutionVertex)vertexIterator.next();
        SchedulingExecutionVertex sv1 = (SchedulingExecutionVertex)scheduler.getSchedulingTopology().getVertices().iterator().next();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), v1.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), v2.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED));
        Assert.assertThat((Object)sv1.getState(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)ExecutionState.SCHEDULED)));
    }

    @Test
    public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        Assert.assertThat((Object)checkpointCoordinator.getNumberOfPendingCheckpoints(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)1)));
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat((Object)checkpointCoordinator.getNumberOfPendingCheckpoints(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)0)));
    }

    @Test
    public void restoreStateWhenRestartingTasks() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        TestMasterHook masterHook = TestMasterHook.fromId("testHook");
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)masterHook);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(scheduler, checkpointId);
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat((Object)masterHook.getRestoreCount(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)1)));
    }

    @Test
    public void failGlobalWhenRestoringStateFails() throws Exception {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        CountDownLatch checkpointTriggeredLatch = this.getCheckpointTriggeredLatch();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        TestMasterHook masterHook = TestMasterHook.fromId("testHook");
        masterHook.enableFailOnRestore();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)masterHook);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(scheduler, checkpointId);
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedExecutionVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        Assert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId}));
        masterHook.disableFailOnRestore();
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat(deployedExecutionVertices, (Matcher)Matchers.contains((Object[])new ExecutionVertexID[]{executionVertexId, executionVertexId}));
    }

    @Test
    public void testInputConstraintALLPerf() throws Exception {
        int parallelism = 1000;
        JobVertex v1 = DefaultSchedulerTest.createVertexWithAllInputConstraints("vertex1", 1000);
        JobVertex v2 = DefaultSchedulerTest.createVertexWithAllInputConstraints("vertex2", 1000);
        JobVertex v3 = DefaultSchedulerTest.createVertexWithAllInputConstraints("vertex3", 1000);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v2.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{v1, v2, v3});
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        AccessExecutionJobVertex ejv1 = (AccessExecutionJobVertex)scheduler.requestJob().getAllVertices().get(v1.getID());
        for (int i = 0; i < 999; ++i) {
            DefaultSchedulerTest.finishSubtask(scheduler, ejv1, i);
        }
        long startTime = System.nanoTime();
        DefaultSchedulerTest.finishSubtask(scheduler, ejv1, 999);
        Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
        Duration timeout = Duration.ofSeconds(5L);
        Assert.assertThat((Object)duration, (Matcher)Matchers.lessThan((Comparable)timeout));
    }

    @Test
    public void failJobWillIncrementVertexVersions() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.failJob((Throwable)new FlinkException("Test failure."));
        Assert.assertTrue((boolean)this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void cancelJobWillIncrementVertexVersions() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.cancel();
        Assert.assertTrue((boolean)this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void suspendJobWillIncrementVertexVersions() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = DefaultSchedulerTest.getOnlyJobVertex(jobGraph);
        ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
        scheduler.suspend((Throwable)new Exception("forced suspend"));
        Assert.assertTrue((boolean)this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        JobID jobId = jobGraph.getJobID();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        Iterator vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId1 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId1, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        JobStatus jobStatusAfterFirstFailure = scheduler.requestJobStatus();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId2, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus jobStatusWithPendingRestarts = scheduler.requestJobStatus();
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus jobStatusAfterRestarts = scheduler.requestJobStatus();
        Assert.assertThat((Object)jobStatusAfterFirstFailure, (Matcher)Matchers.equalTo((Object)JobStatus.RESTARTING));
        Assert.assertThat((Object)jobStatusWithPendingRestarts, (Matcher)Matchers.equalTo((Object)JobStatus.RESTARTING));
        Assert.assertThat((Object)jobStatusAfterRestarts, (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
    }

    @Test
    public void cancelWhileRestartingShouldWaitForRunningTasks() {
        JobGraph jobGraph = DefaultSchedulerTest.singleJobVertexJobGraph(2);
        JobID jobid = jobGraph.getJobID();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        SchedulingTopology topology = scheduler.getSchedulingTopology();
        Iterator vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId1 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex)vertexIterator.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionVertexID executionVertex2 = scheduler.getExecutionVertexIdOrThrow(attemptId2);
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId1, ExecutionState.FAILED, (Throwable)new RuntimeException("expected")));
        scheduler.cancel();
        ExecutionState vertex2StateAfterCancel = topology.getVertex(executionVertex2).getState();
        JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus();
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId2, ExecutionState.CANCELED, (Throwable)new RuntimeException("expected")));
        Assert.assertThat((Object)vertex2StateAfterCancel, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)ExecutionState.CANCELING)));
        Assert.assertThat((Object)statusAfterCancelWhileRestarting, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.CANCELLING)));
        Assert.assertThat((Object)scheduler.requestJobStatus(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)JobStatus.CANCELED)));
    }

    @Test
    public void failureInfoIsSetAfterTaskFailure() {
        JobGraph jobGraph = DefaultSchedulerTest.singleNonParallelJobVertexJobGraph();
        JobID jobId = jobGraph.getJobID();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ArchivedExecutionVertex onlyExecutionVertex = (ArchivedExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.requestJob().getAllExecutionVertices());
        ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        ErrorInfo failureInfo = scheduler.requestJob().getFailureInfo();
        Assert.assertThat((Object)failureInfo, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        Assert.assertThat((Object)failureInfo.getExceptionAsString(), (Matcher)Matchers.containsString((String)"expected exception"));
    }

    @Test
    public void coLocationConstraintIsResetOnTaskRecovery() {
        JobGraph jobGraph = DefaultSchedulerTest.nonParallelSourceSinkJobGraph();
        JobVertex source = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex sink = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        SlotSharingGroup ssg = new SlotSharingGroup();
        source.setSlotSharingGroup(ssg);
        sink.setSlotSharingGroup(ssg);
        sink.setStrictlyCoLocatedWith(source);
        JobID jobId = jobGraph.getJobID();
        DefaultScheduler scheduler = this.createSchedulerAndStartScheduling(jobGraph);
        ExecutionVertex sourceVertex = scheduler.getExecutionVertex(new ExecutionVertexID(source.getID(), 0));
        ExecutionAttemptID sourceAttemptId = sourceVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionVertex sinkVertex = scheduler.getExecutionVertex(new ExecutionVertexID(sink.getID(), 0));
        ExecutionAttemptID sinkAttemptId = sinkVertex.getCurrentExecutionAttempt().getAttemptId();
        sourceVertex.getLocationConstraint().setSlotRequestId(new SlotRequestId());
        Assert.assertThat((Object)sourceVertex.getLocationConstraint().getSlotRequestId(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        String exceptionMessage = "expected exception";
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, sourceAttemptId, ExecutionState.FAILED, (Throwable)new RuntimeException("expected exception")));
        scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, sinkAttemptId, ExecutionState.CANCELED));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assert.assertThat((Object)sourceVertex.getLocationConstraint().getSlotRequestId(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    private static JobVertex createVertexWithAllInputConstraints(String name, int parallelism) {
        JobVertex v = new JobVertex(name);
        v.setParallelism(parallelism);
        v.setInvokableClass(AbstractInvokable.class);
        v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
        return v;
    }

    private static void finishSubtask(DefaultScheduler scheduler, AccessExecutionJobVertex vertex, int subtask) {
        ExecutionAttemptID attemptId = vertex.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(scheduler.getJobGraph().getJobID(), attemptId, ExecutionState.FINISHED));
    }

    private void waitForTermination(DefaultScheduler scheduler) throws Exception {
        scheduler.getTerminationFuture().get(1000L, TimeUnit.MILLISECONDS);
    }

    private static JobGraph singleNonParallelJobVertexJobGraph() {
        return DefaultSchedulerTest.singleJobVertexJobGraph(1);
    }

    private static JobGraph singleJobVertexJobGraph(int parallelism) {
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob");
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        JobVertex vertex = new JobVertex("source");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        jobGraph.addVertex(vertex);
        return jobGraph;
    }

    private static JobGraph nonParallelSourceSinkJobGraph() {
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob");
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        jobGraph.addVertex(source);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        jobGraph.addVertex(sink);
        sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return jobGraph;
    }

    private static JobVertex getOnlyJobVertex(JobGraph jobGraph) {
        List sortedVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Preconditions.checkState((sortedVertices.size() == 1 ? 1 : 0) != 0);
        return (JobVertex)sortedVertices.get(0);
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph) {
        LazyFromSourcesSchedulingStrategy.Factory schedulingStrategyFactory = jobGraph.getScheduleMode() == ScheduleMode.LAZY_FROM_SOURCES ? new LazyFromSourcesSchedulingStrategy.Factory() : new EagerSchedulingStrategy.Factory();
        try {
            DefaultScheduler scheduler = this.createScheduler(jobGraph, (SchedulingStrategyFactory)schedulingStrategyFactory);
            this.startScheduling((SchedulerNG)scheduler);
            return scheduler;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph).setLogger(this.log).setIoExecutor(this.executor).setJobMasterConfiguration(this.configuration).setFutureExecutor(this.scheduledExecutorService).setDelayExecutor(this.taskRestartExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).setRestartBackoffTimeStrategy(this.testRestartBackoffTimeStrategy).setExecutionVertexOperations(this.testExecutionVertexOperations).setExecutionVertexVersioner(this.executionVertexVersioner).setExecutionSlotAllocatorFactory(this.executionSlotAllocatorFactory).build();
    }

    private void startScheduling(SchedulerNG scheduler) {
        scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        scheduler.startScheduling();
    }

    private CountDownLatch getCheckpointTriggeredLatch() {
        CountDownLatch checkpointTriggeredLatch = new CountDownLatch(1);
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.testExecutionSlotAllocator.getLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway);
        taskManagerGateway.setCheckpointConsumer((executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions) -> checkpointTriggeredLatch.countDown());
        return checkpointTriggeredLatch;
    }
}

