package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest.class */
public class TaskTest extends TestLogger {
    private static final String RESTORE_EXCEPTION_MSG = "TestExceptionInRestore";
    private ShuffleEnvironment<?, ?> shuffleEnvironment;

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static boolean wasCleanedUp = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$AwaitLatchInvokable.class */
    public static abstract class AwaitLatchInvokable extends AbstractInvokable {
        final OneShotLatch awaitLatch;

        public AwaitLatchInvokable(Environment environment) {
            super(environment);
            this.awaitLatch = new OneShotLatch();
        }

        void await() throws InterruptedException {
            this.awaitLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$FailingInvokableWithChainedException.class */
    private static final class FailingInvokableWithChainedException extends AbstractInvokable {
        public FailingInvokableWithChainedException(Environment environment) {
            super(environment);
        }

        public void invoke() {
            throw new TestWrappedException(new IOException("test"));
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableBlockingInCancel.class */
    public static final class InvokableBlockingInCancel extends TriggerLatchInvokable {
        public InvokableBlockingInCancel(Environment environment) {
            super(environment);
        }

        public void invoke() {
            this.awaitLatch.trigger();
            try {
                this.triggerLatch.await();
                synchronized (this) {
                    wait();
                }
            } catch (InterruptedException e) {
                synchronized (this) {
                    notifyAll();
                }
            }
        }

        public void cancel() throws Exception {
            synchronized (this) {
                this.triggerLatch.trigger();
                wait();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableBlockingInInvoke.class */
    private static final class InvokableBlockingInInvoke extends AwaitLatchInvokable {
        public InvokableBlockingInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            this.awaitLatch.trigger();
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableBlockingInRestore.class */
    private static final class InvokableBlockingInRestore extends AwaitLatchInvokable {
        public InvokableBlockingInRestore(Environment environment) {
            super(environment);
        }

        public void restore() throws Exception {
            this.awaitLatch.trigger();
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        }

        public void invoke() {
        }

        public void cleanUp(Throwable th) throws Exception {
            boolean unused = TaskTest.wasCleanedUp = true;
            super.cleanUp(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableBlockingWithTrigger.class */
    private static class InvokableBlockingWithTrigger extends TriggerLatchInvokable {
        public InvokableBlockingWithTrigger(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            this.awaitLatch.trigger();
            this.triggerLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableDecliningCheckpoints.class */
    private static class InvokableDecliningCheckpoints extends InvokableBlockingWithTrigger {
        public static final int REJECTED_EXECUTION_CHECKPOINT_ID = 2;
        public static final int THROWING_CHECKPOINT_ID = 3;
        public static final int TRIGGERING_FAILED_CHECKPOINT_ID = 4;

        public InvokableDecliningCheckpoints(Environment environment) {
            super(environment);
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            long checkpointId = checkpointMetaData.getCheckpointId();
            switch (Math.toIntExact(checkpointId)) {
                case 2:
                    throw new RejectedExecutionException();
                case 3:
                    CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
                    completableFuture.completeExceptionally(new ExpectedTestException());
                    return completableFuture;
                case 4:
                    return CompletableFuture.completedFuture(false);
                default:
                    throw new UnsupportedOperationException("Unsupported checkpointId: " + checkpointId);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableInterruptibleSharedLockInInvokeAndCancel.class */
    public static final class InvokableInterruptibleSharedLockInInvokeAndCancel extends TriggerLatchInvokable {
        private final Object lock;

        public InvokableInterruptibleSharedLockInInvokeAndCancel(Environment environment) {
            super(environment);
            this.lock = new Object();
        }

        public void invoke() throws Exception {
            while (!this.triggerLatch.isTriggered()) {
                synchronized (this.lock) {
                    this.awaitLatch.trigger();
                    this.lock.wait();
                }
            }
        }

        public void cancel() {
            synchronized (this.lock) {
                this.triggerLatch.trigger();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableNonInstantiable.class */
    private static abstract class InvokableNonInstantiable extends AbstractInvokable {
        public InvokableNonInstantiable(Environment environment) {
            super(environment);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableUnInterruptibleBlockingInvoke.class */
    public static final class InvokableUnInterruptibleBlockingInvoke extends TriggerLatchInvokable {
        public InvokableUnInterruptibleBlockingInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() {
            while (!this.triggerLatch.isTriggered()) {
                try {
                    synchronized (this) {
                        this.awaitLatch.trigger();
                        wait();
                    }
                } catch (InterruptedException e) {
                }
            }
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithCancelTaskExceptionInInvoke.class */
    public static final class InvokableWithCancelTaskExceptionInInvoke extends TriggerLatchInvokable {
        public InvokableWithCancelTaskExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() {
            awaitTriggerLatch();
            throw new CancelTaskException();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithExceptionInInvoke.class */
    private static final class InvokableWithExceptionInInvoke extends AbstractInvokable {
        public InvokableWithExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            throw new Exception("test");
        }

        public void cleanUp(Throwable th) throws Exception {
            boolean unused = TaskTest.wasCleanedUp = true;
            super.cleanUp(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithExceptionInRestore.class */
    static final class InvokableWithExceptionInRestore extends AbstractInvokable {
        public InvokableWithExceptionInRestore(Environment environment) {
            super(environment);
        }

        public void restore() throws Exception {
            throw new Exception(TaskTest.RESTORE_EXCEPTION_MSG);
        }

        public void invoke() {
        }

        public void cleanUp(Throwable th) throws Exception {
            boolean unused = TaskTest.wasCleanedUp = true;
            super.cleanUp(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithExceptionOnTrigger.class */
    public static final class InvokableWithExceptionOnTrigger extends TriggerLatchInvokable {
        public InvokableWithExceptionOnTrigger(Environment environment) {
            super(environment);
        }

        public void invoke() {
            awaitTriggerLatch();
            throw new RuntimeException("test");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$ProhibitFatalErrorTaskManagerActions.class */
    private static class ProhibitFatalErrorTaskManagerActions extends NoOpTaskManagerActions {
        private ProhibitFatalErrorTaskManagerActions() {
        }

        @Override // org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions
        public void notifyFatalError(String str, Throwable th) {
            throw new RuntimeException("Unexpected FatalError notification");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$QueuedNoOpTaskManagerActions.class */
    public static class QueuedNoOpTaskManagerActions extends NoOpTaskManagerActions {
        private final BlockingQueue<TaskExecutionState> queue;

        private QueuedNoOpTaskManagerActions() {
            this.queue = new LinkedBlockingDeque();
        }

        @Override // org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            Assert.assertTrue(this.queue.offer(taskExecutionState));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void validateListenerMessage(ExecutionState executionState, Task task, Throwable th) {
            try {
                TaskExecutionState take = this.queue.take();
                Assert.assertNotNull("There is no additional listener message", executionState);
                Assert.assertEquals(task.getExecutionId(), take.getID());
                Assert.assertEquals(executionState, take.getExecutionState());
                Throwable error = take.getError(getClass().getClassLoader());
                if (th == null) {
                    Assert.assertNull(error);
                } else {
                    Assert.assertEquals(th.toString(), error.toString());
                }
            } catch (InterruptedException e) {
                Assert.fail("interrupted");
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$TestInvokableCorrect.class */
    private static final class TestInvokableCorrect extends AbstractInvokable {
        public TestInvokableCorrect(Environment environment) {
            super(environment);
        }

        public void invoke() {
        }

        public void cancel() {
        }

        public void cleanUp(Throwable th) throws Exception {
            boolean unused = TaskTest.wasCleanedUp = true;
            super.cleanUp(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$TestWrappedException.class */
    private static class TestWrappedException extends WrappingRuntimeException {
        private static final long serialVersionUID = 1;

        TestWrappedException(@Nonnull Throwable th) {
            super(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$TriggerLatchInvokable.class */
    public static abstract class TriggerLatchInvokable extends AwaitLatchInvokable {
        final OneShotLatch triggerLatch;

        public TriggerLatchInvokable(Environment environment) {
            super(environment);
            this.triggerLatch = new OneShotLatch();
        }

        void trigger() {
            this.triggerLatch.trigger();
        }

        void awaitTriggerLatch() {
            this.awaitLatch.trigger();
            while (true) {
                try {
                    this.triggerLatch.await();
                    return;
                } catch (InterruptedException e) {
                }
            }
        }
    }

    @Before
    public void setup() {
        this.shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        wasCleanedUp = false;
    }

    @After
    public void teardown() throws Exception {
        if (this.shuffleEnvironment != null) {
            this.shuffleEnvironment.close();
        }
    }

    @Test
    public void testCleanupWhenRestoreFails() throws Exception {
        createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).build(Executors.directExecutor()).run();
        Assert.assertTrue(wasCleanedUp);
    }

    @Test
    public void testCleanupWhenInvokeFails() throws Exception {
        createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).build(Executors.directExecutor()).run();
        Assert.assertTrue(wasCleanedUp);
    }

    @Test
    public void testCleanupWhenCancelledAfterRestore() throws Exception {
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.cancelExecution();
        build.getExecutingThread().join();
        Assert.assertTrue(wasCleanedUp);
    }

    @Test
    public void testCleanupWhenAfterInvokeSucceeded() throws Exception {
        Task build = createTaskBuilder().setInvokable(TestInvokableCorrect.class).build(Executors.directExecutor());
        build.run();
        Assert.assertTrue(wasCleanedUp);
        Assert.assertFalse(build.isCanceledOrFailed());
    }

    @Test
    public void testCleanupWhenSwitchToInitializationFails() throws Exception {
        Task build = createTaskBuilder().setInvokable(TestInvokableCorrect.class).setTaskManagerActions(new NoOpTaskManagerActions() { // from class: org.apache.flink.runtime.taskmanager.TaskTest.1
            @Override // org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions
            public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
                if (taskExecutionState.getExecutionState() == ExecutionState.INITIALIZING) {
                    throw new ExpectedTestException();
                }
            }
        }).build(Executors.directExecutor());
        build.run();
        Assert.assertTrue(wasCleanedUp);
        Assert.assertTrue(build.isCanceledOrFailed());
    }

    @Test
    public void testRegularExecution() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(TestInvokableCorrect.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        Assert.assertEquals(ExecutionState.CREATED, build.getExecutionState());
        Assert.assertFalse(build.isCanceledOrFailed());
        Assert.assertNull(build.getFailureCause());
        build.run();
        Assert.assertEquals(ExecutionState.FINISHED, build.getExecutionState());
        Assert.assertFalse(build.isCanceledOrFailed());
        Assert.assertNull(build.getFailureCause());
        Assert.assertNull(build.getInvokable());
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FINISHED, build, null);
    }

    @Test
    public void testCancelRightAway() throws Exception {
        Task build = createTaskBuilder().build(Executors.directExecutor());
        build.cancelExecution();
        Assert.assertEquals(ExecutionState.CANCELING, build.getExecutionState());
        build.run();
        Assert.assertEquals(ExecutionState.CANCELED, build.getExecutionState());
        Assert.assertNull(build.getInvokable());
    }

    @Test
    public void testFailExternallyRightAway() throws Exception {
        Task build = createTaskBuilder().build(Executors.directExecutor());
        build.failExternally(new Exception("fail externally"));
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        build.run();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
    }

    @Test
    public void testLibraryCacheRegistrationFailed() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        IOException iOException = new IOException("Could not load classloader");
        Task build = createTaskBuilder().setTaskManagerActions(queuedNoOpTaskManagerActions).setClassLoaderHandle(TestingClassLoaderLease.newBuilder().setGetOrResolveClassLoaderFunction((collection, collection2) -> {
            throw iOException;
        }).build()).build(Executors.directExecutor());
        Assert.assertEquals(ExecutionState.CREATED, build.getExecutionState());
        Assert.assertFalse(build.isCanceledOrFailed());
        Assert.assertNull(build.getFailureCause());
        build.run();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        MatcherAssert.assertThat(build.getFailureCause(), CoreMatchers.is(iOException));
        Assert.assertNull(build.getInvokable());
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, iOException);
    }

    @Test
    public void testExecutionFailsInNetworkRegistrationForPartitions() throws Exception {
        testExecutionFailsInNetworkRegistration(Collections.singletonList(new ResultPartitionDeploymentDescriptor(PartitionDescriptorBuilder.newBuilder().build(), NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), 1)), Collections.emptyList());
    }

    @Test
    public void testExecutionFailsInNetworkRegistrationForGates() throws Exception {
        testExecutionFailsInNetworkRegistration(Collections.emptyList(), Collections.singletonList(new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]{new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex(NettyShuffleDescriptorBuilder.newBuilder().buildRemote(), 0)})));
    }

    private void testExecutionFailsInNetworkRegistration(List<ResultPartitionDeploymentDescriptor> list, List<InputGateDeploymentDescriptor> list2) throws Exception {
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class);
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = new TestTaskBuilder(this.shuffleEnvironment).setTaskManagerActions(queuedNoOpTaskManagerActions).setPartitionProducerStateChecker(partitionProducerStateChecker).setResultPartitions(list).setInputGates(list2).build(EXECUTOR_RESOURCE.getExecutor());
        this.shuffleEnvironment.close();
        build.run();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertTrue(build.getFailureCause().getMessage().contains("Network buffer pool has already been destroyed."));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new IllegalStateException("Network buffer pool has already been destroyed."));
    }

    @Test
    public void testInvokableInstantiationFailed() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setTaskManagerActions(queuedNoOpTaskManagerActions).setInvokable(InvokableNonInstantiable.class).build(Executors.directExecutor());
        build.run();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertTrue(build.getFailureCause().getMessage().contains("instantiate"));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new FlinkException("Could not instantiate the task's invokable class."));
    }

    @Test
    public void testExecutionFailsInRestore() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.run();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertNotNull(build.getFailureCause());
        Assert.assertNotNull(build.getFailureCause().getMessage());
        MatcherAssert.assertThat(build.getFailureCause().getMessage(), CoreMatchers.containsString(RESTORE_EXCEPTION_MSG));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new Exception(RESTORE_EXCEPTION_MSG));
    }

    @Test
    public void testExecutionFailsInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.run();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertNotNull(build.getFailureCause());
        Assert.assertNotNull(build.getFailureCause().getMessage());
        Assert.assertTrue(build.getFailureCause().getMessage().contains("test"));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new Exception("test"));
    }

    @Test
    public void testFailWithWrappedException() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(FailingInvokableWithChainedException.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.run();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertTrue(build.getFailureCause() instanceof IOException);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new IOException("test"));
    }

    @Test
    public void testCancelDuringRestore() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.cancelExecution();
        Assert.assertTrue(build.getExecutionState() == ExecutionState.CANCELING || build.getExecutionState() == ExecutionState.CANCELED);
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertNull(build.getFailureCause());
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.CANCELED, build, null);
    }

    @Test
    public void testCancelDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.cancelExecution();
        Assert.assertTrue(build.getExecutionState() == ExecutionState.CANCELING || build.getExecutionState() == ExecutionState.CANCELED);
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertNull(build.getFailureCause());
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.CANCELED, build, null);
    }

    @Test
    public void testFailExternallyDuringRestore() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.failExternally(new Exception(RESTORE_EXCEPTION_MSG));
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        MatcherAssert.assertThat(build.getFailureCause().getMessage(), CoreMatchers.containsString(RESTORE_EXCEPTION_MSG));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new Exception(RESTORE_EXCEPTION_MSG));
    }

    @Test
    public void testFailExternallyDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.failExternally(new Exception("test"));
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertTrue(build.getFailureCause().getMessage().contains("test"));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new Exception("test"));
    }

    @Test
    public void testCanceledAfterExecutionFailedInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.run();
        build.cancelExecution();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertTrue(build.getFailureCause().getMessage().contains("test"));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new Exception("test"));
    }

    @Test
    public void testExecutionFailsAfterCanceling() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.cancelExecution();
        Assert.assertEquals(ExecutionState.CANCELING, build.getExecutionState());
        triggerInvokableLatch(build);
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertNull(build.getFailureCause());
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.CANCELED, build, null);
    }

    @Test
    public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
        QueuedNoOpTaskManagerActions queuedNoOpTaskManagerActions = new QueuedNoOpTaskManagerActions();
        Task build = createTaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(queuedNoOpTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.failExternally(new Exception("external"));
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        triggerInvokableLatch(build);
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertTrue(build.getFailureCause().getMessage().contains("external"));
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.RUNNING, build, null);
        queuedNoOpTaskManagerActions.validateListenerMessage(ExecutionState.FAILED, build, new Exception("external"));
    }

    @Test
    public void testCancelTaskException() throws Exception {
        Task build = createTaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build(Executors.directExecutor());
        build.startTaskThread();
        triggerInvokableLatch(build);
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, build.getExecutionState());
    }

    @Test
    public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
        Task build = createTaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.failExternally(new Exception("external"));
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        triggerInvokableLatch(build);
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FAILED, build.getExecutionState());
        Assert.assertTrue(build.isCanceledOrFailed());
        Assert.assertTrue(build.getFailureCause().getMessage().contains("external"));
    }

    @Test
    public void testOnPartitionStateUpdateWhileRunning() throws Exception {
        testOnPartitionStateUpdate(ExecutionState.RUNNING);
    }

    @Test
    public void testOnPartitionStateUpdateWhileDeploying() throws Exception {
        testOnPartitionStateUpdate(ExecutionState.DEPLOYING);
    }

    public void testOnPartitionStateUpdate(ExecutionState executionState) throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).build(Executors.directExecutor());
        RemoteChannelStateChecker remoteChannelStateChecker = new RemoteChannelStateChecker(resultPartitionID, "test task");
        HashMap hashMap = new HashMap(ExecutionState.values().length);
        for (ExecutionState executionState2 : ExecutionState.values()) {
            hashMap.put(executionState2, ExecutionState.FAILED);
        }
        hashMap.put(ExecutionState.INITIALIZING, executionState);
        hashMap.put(ExecutionState.RUNNING, executionState);
        hashMap.put(ExecutionState.SCHEDULED, executionState);
        hashMap.put(ExecutionState.DEPLOYING, executionState);
        hashMap.put(ExecutionState.FINISHED, executionState);
        hashMap.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
        hashMap.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
        hashMap.put(ExecutionState.FAILED, ExecutionState.CANCELING);
        int i = 0;
        for (ExecutionState executionState3 : ExecutionState.values()) {
            TestTaskBuilder.setTaskState(build, executionState);
            build.getClass();
            if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(new Task.PartitionProducerStateResponseHandle(build, executionState3, (Throwable) null))) {
                i++;
            }
            Assert.assertEquals(hashMap.get(executionState3), build.getExecutionState());
        }
        Assert.assertEquals(5L, i);
    }

    @Test
    public void testTriggerPartitionStateUpdate() throws Exception {
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        RemoteChannelStateChecker remoteChannelStateChecker = new RemoteChannelStateChecker(resultPartitionID, "test task");
        setup();
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionProducerStateChecker).build(Executors.directExecutor());
        TestTaskBuilder.setTaskState(build, ExecutionState.RUNNING);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(partitionProducerStateChecker.requestPartitionProducerState((JobID) ArgumentMatchers.eq(build.getJobID()), (IntermediateDataSetID) ArgumentMatchers.eq(intermediateDataSetID), (ResultPartitionID) ArgumentMatchers.eq(resultPartitionID))).thenReturn(completableFuture);
        build.requestPartitionProducerState(intermediateDataSetID, resultPartitionID, responseHandle -> {
            MatcherAssert.assertThat(Boolean.valueOf(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(responseHandle)), CoreMatchers.is(false));
        });
        completableFuture.completeExceptionally(new PartitionProducerDisposedException(resultPartitionID));
        Assert.assertEquals(ExecutionState.CANCELING, build.getExecutionState());
        setup();
        Task build2 = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionProducerStateChecker).build(Executors.directExecutor());
        TestTaskBuilder.setTaskState(build2, ExecutionState.RUNNING);
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(partitionProducerStateChecker.requestPartitionProducerState((JobID) ArgumentMatchers.eq(build2.getJobID()), (IntermediateDataSetID) ArgumentMatchers.eq(intermediateDataSetID), (ResultPartitionID) ArgumentMatchers.eq(resultPartitionID))).thenReturn(completableFuture2);
        build2.requestPartitionProducerState(intermediateDataSetID, resultPartitionID, responseHandle2 -> {
            MatcherAssert.assertThat(Boolean.valueOf(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(responseHandle2)), CoreMatchers.is(false));
        });
        completableFuture2.completeExceptionally(new RuntimeException("Any other exception"));
        Assert.assertEquals(ExecutionState.FAILED, build2.getExecutionState());
        atomicInteger.set(0);
        setup();
        Task build3 = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionProducerStateChecker).build(Executors.directExecutor());
        try {
            build3.startTaskThread();
            awaitInvokableLatch(build3);
            CompletableFuture completableFuture3 = new CompletableFuture();
            Mockito.when(partitionProducerStateChecker.requestPartitionProducerState((JobID) ArgumentMatchers.eq(build3.getJobID()), (IntermediateDataSetID) ArgumentMatchers.eq(intermediateDataSetID), (ResultPartitionID) ArgumentMatchers.eq(resultPartitionID))).thenReturn(completableFuture3);
            build3.requestPartitionProducerState(intermediateDataSetID, resultPartitionID, responseHandle3 -> {
                if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(responseHandle3)) {
                    atomicInteger.incrementAndGet();
                }
            });
            completableFuture3.completeExceptionally(new TimeoutException());
            Assert.assertEquals(ExecutionState.RUNNING, build3.getExecutionState());
            Assert.assertEquals(1L, atomicInteger.get());
            build3.getExecutingThread().interrupt();
            build3.getExecutingThread().join();
            atomicInteger.set(0);
            setup();
            build3 = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionProducerStateChecker).build(Executors.directExecutor());
            try {
                build3.startTaskThread();
                awaitInvokableLatch(build3);
                CompletableFuture completableFuture4 = new CompletableFuture();
                Mockito.when(partitionProducerStateChecker.requestPartitionProducerState((JobID) ArgumentMatchers.eq(build3.getJobID()), (IntermediateDataSetID) ArgumentMatchers.eq(intermediateDataSetID), (ResultPartitionID) ArgumentMatchers.eq(resultPartitionID))).thenReturn(completableFuture4);
                build3.requestPartitionProducerState(intermediateDataSetID, resultPartitionID, responseHandle4 -> {
                    if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(responseHandle4)) {
                        atomicInteger.incrementAndGet();
                    }
                });
                completableFuture4.complete(ExecutionState.RUNNING);
                Assert.assertEquals(ExecutionState.RUNNING, build3.getExecutionState());
                Assert.assertEquals(1L, atomicInteger.get());
                build3.getExecutingThread().interrupt();
                build3.getExecutingThread().join();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testWatchDogInterruptsTask() throws Exception {
        ProhibitFatalErrorTaskManagerActions prohibitFatalErrorTaskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration configuration = new Configuration();
        configuration.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5L);
        configuration.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60000L);
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInCancel.class).setTaskManagerConfig(configuration).setTaskManagerActions(prohibitFatalErrorTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.cancelExecution();
        build.getExecutingThread().join();
    }

    @Test
    public void testInterruptibleSharedLockInInvokeAndCancel() throws Exception {
        ProhibitFatalErrorTaskManagerActions prohibitFatalErrorTaskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(5L));
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(1000L));
        Task build = createTaskBuilder().setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class).setTaskManagerConfig(configuration).setTaskManagerActions(prohibitFatalErrorTaskManagerActions).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        build.cancelExecution();
        build.getExecutingThread().join();
    }

    @Test
    public void testFatalErrorAfterUnInterruptibleInvoke() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingTaskManagerActions build = TestingTaskManagerActions.newBuilder().setNotifyFatalErrorConsumer((str, th) -> {
            completableFuture.complete(th);
        }).build();
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(10L));
        Task build2 = createTaskBuilder().setInvokable(InvokableUnInterruptibleBlockingInvoke.class).setTaskManagerConfig(configuration).setTaskManagerActions(build).build(Executors.directExecutor());
        try {
            build2.startTaskThread();
            awaitInvokableLatch(build2);
            build2.cancelExecution();
            MatcherAssert.assertThat((Throwable) completableFuture.join(), CoreMatchers.is(CoreMatchers.notNullValue()));
            triggerInvokableLatch(build2);
            build2.getExecutingThread().interrupt();
            build2.getExecutingThread().join();
        } catch (Throwable th2) {
            triggerInvokableLatch(build2);
            build2.getExecutingThread().interrupt();
            build2.getExecutingThread().join();
            throw th2;
        }
    }

    @Test
    public void testFatalErrorOnCanceling() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingTaskManagerActions build = TestingTaskManagerActions.newBuilder().setNotifyFatalErrorConsumer((str, th) -> {
            completableFuture.complete(th);
        }).build();
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(5L));
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(50L));
        Task build2 = createTaskBuilder().setInvokable(InvokableBlockingWithTrigger.class).setTaskManagerConfig(configuration).setTaskManagerActions(build).build(Executors.directExecutor());
        Task task = (Task) Mockito.spy(build2);
        ((Task) Mockito.doThrow(OutOfMemoryError.class).when(task)).cancelOrFailAndCancelInvokableInternal((ExecutionState) ArgumentMatchers.eq(ExecutionState.CANCELING), (Throwable) ArgumentMatchers.eq((Object) null));
        try {
            task.startTaskThread();
            awaitInvokableLatch(build2);
            task.cancelExecution();
            MatcherAssert.assertThat((Throwable) completableFuture.join(), CoreMatchers.instanceOf(OutOfMemoryError.class));
            triggerInvokableLatch(build2);
        } catch (Throwable th2) {
            triggerInvokableLatch(build2);
            throw th2;
        }
    }

    @Test
    public void testTaskConfig() throws Exception {
        long j = 28218123 + 19292;
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(28218123L));
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(j));
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setTaskCancellationInterval(28218123 + 1337);
        executionConfig.setTaskCancellationTimeout(j - 1337);
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerConfig(configuration).setExecutionConfig(executionConfig).build(Executors.directExecutor());
        Assert.assertEquals(28218123L, build.getTaskCancellationInterval());
        Assert.assertEquals(j, build.getTaskCancellationTimeout());
        build.startTaskThread();
        awaitInvokableLatch(build);
        Assert.assertEquals(executionConfig.getTaskCancellationInterval(), build.getTaskCancellationInterval());
        Assert.assertEquals(executionConfig.getTaskCancellationTimeout(), build.getTaskCancellationTimeout());
        build.getExecutingThread().interrupt();
        build.getExecutingThread().join();
    }

    @Test
    public void testTerminationFutureCompletesOnNormalExecution() throws Exception {
        Task build = createTaskBuilder().setInvokable(InvokableBlockingWithTrigger.class).setTaskManagerActions(new NoOpTaskManagerActions()).build(Executors.directExecutor());
        build.startTaskThread();
        awaitInvokableLatch(build);
        Assert.assertFalse(build.getTerminationFuture().isDone());
        triggerInvokableLatch(build);
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FINISHED, build.getTerminationFuture().getNow(null));
    }

    @Test
    public void testTerminationFutureCompletesOnImmediateCancellation() throws Exception {
        Task build = createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(new NoOpTaskManagerActions()).build(Executors.directExecutor());
        build.cancelExecution();
        Assert.assertFalse(build.getTerminationFuture().isDone());
        build.startTaskThread();
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, build.getTerminationFuture().getNow(null));
    }

    @Test
    public void testTerminationFutureCompletesOnErrorInInvoke() throws Exception {
        Task build = createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(new NoOpTaskManagerActions()).build(Executors.directExecutor());
        build.startTaskThread();
        build.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FAILED, build.getTerminationFuture().getNow(null));
    }

    @Test
    public void testNoBackPressureIfTaskNotStarted() throws Exception {
        Assert.assertFalse(createTaskBuilder().build(Executors.directExecutor()).isBackPressured());
    }

    @Test
    public void testDeclineCheckpoint() throws Exception {
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        Task build = createTaskBuilder().setInvokable(InvokableDecliningCheckpoints.class).setCheckpointResponder(testCheckpointResponder).build(Executors.directExecutor());
        assertCheckpointDeclined(build, testCheckpointResponder, 1L, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
        build.startTaskThread();
        try {
            awaitInvokableLatch(build);
            Assert.assertEquals(ExecutionState.RUNNING, build.getExecutionState());
            assertCheckpointDeclined(build, testCheckpointResponder, 2L, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING);
            assertCheckpointDeclined(build, testCheckpointResponder, 3L, CheckpointFailureReason.TASK_FAILURE);
            assertCheckpointDeclined(build, testCheckpointResponder, 4L, CheckpointFailureReason.TASK_FAILURE);
            Assert.assertEquals(ExecutionState.FINISHED, build.getTerminationFuture().getNow(null));
        } finally {
            triggerInvokableLatch(build);
            build.getExecutingThread().join();
        }
    }

    private void assertCheckpointDeclined(Task task, TestCheckpointResponder testCheckpointResponder, long j, CheckpointFailureReason checkpointFailureReason) {
        task.triggerCheckpointBarrier(j, 1L, CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
        Assert.assertEquals(1L, testCheckpointResponder.getDeclineReports().size());
        Assert.assertEquals(j, testCheckpointResponder.getDeclineReports().get(0).getCheckpointId());
        Assert.assertEquals(checkpointFailureReason, testCheckpointResponder.getDeclineReports().get(0).getCause().getCheckpointFailureReason());
        testCheckpointResponder.clear();
    }

    private TaskInvokable waitForInvokable(Task task) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(task.getInvokable() != null);
        }, 10L);
        return task.getInvokable();
    }

    private void awaitInvokableLatch(Task task) throws Exception {
        AwaitLatchInvokable waitForInvokable = waitForInvokable(task);
        if (!(waitForInvokable instanceof AwaitLatchInvokable)) {
            throw new Exception("Invokable doesn't implement class - " + AwaitLatchInvokable.class.getName());
        }
        waitForInvokable.await();
    }

    private void triggerInvokableLatch(Task task) throws Exception {
        TriggerLatchInvokable waitForInvokable = waitForInvokable(task);
        if (!(waitForInvokable instanceof TriggerLatchInvokable)) {
            throw new Exception("Invokable doesn't implement class - " + TriggerLatchInvokable.class.getName());
        }
        waitForInvokable.trigger();
    }

    private TestTaskBuilder createTaskBuilder() {
        return new TestTaskBuilder(this.shuffleEnvironment);
    }
}
