package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultSubtaskAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.EdgeManager;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.failover.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskTest;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicIntegerAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.class */
public class ExecutingTest {
    private static final Logger log = LoggerFactory.getLogger(ExecutingTest.class);

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    /* renamed from: org.apache.flink.runtime.scheduler.adaptive.ExecutingTest$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$JobStatus = new int[JobStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.INITIALIZING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.CREATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.FINISHED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$CancellingArguments.class */
    public static class CancellingArguments {
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandle;

        public CancellingArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler) {
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandle = operatorCoordinatorHandler;
        }

        public ExecutionGraph getExecutionGraph() {
            return this.executionGraph;
        }

        public ExecutionGraphHandler getExecutionGraphHandler() {
            return this.executionGraphHandler;
        }

        public OperatorCoordinatorHandler getOperatorCoordinatorHandle() {
            return this.operatorCoordinatorHandle;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$ExecutingStateBuilder.class */
    private final class ExecutingStateBuilder {
        private ExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().build((ScheduledExecutorService) ExecutingTest.EXECUTOR_EXTENSION.getExecutor());
        private Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> {
            return TestingStateTransitionManager.withNoOp();
        };
        private int rescaleOnFailedCheckpointCount = 1;
        private OperatorCoordinatorHandler operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();

        private ExecutingStateBuilder() throws JobException, JobExecutionException {
        }

        public ExecutingStateBuilder setExecutionGraph(ExecutionGraph executionGraph) {
            this.executionGraph = executionGraph;
            return this;
        }

        public ExecutingStateBuilder setOperatorCoordinatorHandler(OperatorCoordinatorHandler operatorCoordinatorHandler) {
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            return this;
        }

        public ExecutingStateBuilder setStateTransitionManagerFactory(Function<StateTransitionManager.Context, StateTransitionManager> function) {
            this.stateTransitionManagerFactory = function;
            return this;
        }

        public ExecutingStateBuilder setRescaleOnFailedCheckpointCount(int i) {
            this.rescaleOnFailedCheckpointCount = i;
            return this;
        }

        private Executing build(MockExecutingContext mockExecutingContext) {
            this.executionGraph.transitionToRunning();
            try {
                ExecutionGraph executionGraph = this.executionGraph;
                ExecutionGraphHandler executionGraphHandler = ExecutingTest.this.getExecutionGraphHandler(this.executionGraph, mockExecutingContext.m538getMainThreadExecutor());
                OperatorCoordinatorHandler operatorCoordinatorHandler = this.operatorCoordinatorHandler;
                Logger logger = ExecutingTest.log;
                ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
                ArrayList arrayList = new ArrayList();
                Function<StateTransitionManager.Context, StateTransitionManager> function = this.stateTransitionManagerFactory;
                Objects.requireNonNull(function);
                return new Executing(executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger, mockExecutingContext, systemClassLoader, arrayList, (v1) -> {
                    return r9.apply(v1);
                }, this.rescaleOnFailedCheckpointCount);
            } finally {
                Preconditions.checkState(!mockExecutingContext.hadStateTransition, "State construction is an on-going state transition, during which no further transitions are allowed.");
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$FailOnDeployMockExecutionVertex.class */
    static class FailOnDeployMockExecutionVertex extends ExecutionVertex {

        @Nullable
        private Throwable markFailed;

        public FailOnDeployMockExecutionVertex(ExecutionJobVertex executionJobVertex) {
            super(executionJobVertex, 1, new IntermediateResult[0], Duration.ofMillis(1L), 1L, 1, 0);
            this.markFailed = null;
        }

        public void deploy() throws JobException {
            throw new JobException("Intentional Test exception");
        }

        public void markFailed(Throwable th) {
            this.markFailed = th;
        }

        @Nullable
        public Throwable getMarkedFailure() {
            return this.markFailed;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$FailingArguments.class */
    public static class FailingArguments extends CancellingArguments {
        private final Throwable failureCause;

        public FailingArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable th) {
            super(executionGraph, executionGraphHandler, operatorCoordinatorHandler);
            this.failureCause = th;
        }

        public Throwable getFailureCause() {
            return this.failureCause;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$FinishingMockExecutionGraph.class */
    private static class FinishingMockExecutionGraph extends StateTrackingMockExecutionGraph {
        private FinishingMockExecutionGraph() {
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph
        public long getStatusTimestamp(JobStatus jobStatus) {
            switch (AnonymousClass3.$SwitchMap$org$apache$flink$api$common$JobStatus[jobStatus.ordinal()]) {
                case 1:
                    return 1L;
                case TaskTest.InvokableDecliningCheckpoints.REJECTED_EXECUTION_CHECKPOINT_ID /* 2 */:
                    return 2L;
                case TaskTest.InvokableDecliningCheckpoints.THROWING_CHECKPOINT_ID /* 3 */:
                    return 3L;
                case 4:
                    return 4L;
                default:
                    return 0L;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$MockExecutingContext.class */
    private static class MockExecutingContext extends MockStateWithExecutionGraphContext implements Executing.Context {
        private Function<Throwable, FailureResult> howToHandleFailure;
        private final StateValidator<FailingArguments> failingStateValidator = new StateValidator<>("failing");
        private final StateValidator<RestartingArguments> restartingStateValidator = new StateValidator<>("restarting");
        private final StateValidator<CancellingArguments> cancellingStateValidator = new StateValidator<>("cancelling");
        private StateValidator<StopWithSavepointArguments> stopWithSavepointValidator = new StateValidator<>("stopWithSavepoint");
        private CompletableFuture<String> mockedStopWithSavepointOperationFuture = new CompletableFuture<>();
        private VertexParallelism vertexParallelism = new VertexParallelism(Collections.emptyMap());
        private Supplier<Boolean> hasDesiredResourcesSupplier = () -> {
            return false;
        };
        private Supplier<Boolean> hasSufficientResourcesSupplier = () -> {
            return false;
        };

        private MockExecutingContext() {
        }

        public void setExpectFailing(Consumer<FailingArguments> consumer) {
            this.failingStateValidator.expectInput(consumer);
        }

        public void setExpectRestarting(Consumer<RestartingArguments> consumer) {
            this.restartingStateValidator.expectInput(consumer);
        }

        public void setExpectCancelling(Consumer<CancellingArguments> consumer) {
            this.cancellingStateValidator.expectInput(consumer);
        }

        public void setExpectStopWithSavepoint(Consumer<StopWithSavepointArguments> consumer) {
            this.stopWithSavepointValidator.expectInput(consumer);
        }

        public void setHowToHandleFailure(Function<Throwable, FailureResult> function) {
            this.howToHandleFailure = function;
        }

        public void setVertexParallelism(VertexParallelism vertexParallelism) {
            this.vertexParallelism = vertexParallelism;
        }

        public void setHasDesiredResources(Supplier<Boolean> supplier) {
            this.hasDesiredResourcesSupplier = supplier;
        }

        public void setHasSufficientResources(Supplier<Boolean> supplier) {
            this.hasSufficientResourcesSupplier = supplier;
        }

        public void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> list) {
            this.cancellingStateValidator.validateInput(new CancellingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler));
            this.hadStateTransition = true;
        }

        public FailureResult howToHandleFailure(Throwable th, CompletableFuture<Map<String, String>> completableFuture) {
            return this.howToHandleFailure.apply(th);
        }

        public Optional<VertexParallelism> getAvailableVertexParallelism() {
            return Optional.ofNullable(this.vertexParallelism);
        }

        public void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration duration, @Nullable VertexParallelism vertexParallelism, List<ExceptionHistoryEntry> list) {
            this.restartingStateValidator.validateInput(new RestartingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, duration, vertexParallelism));
            this.hadStateTransition = true;
        }

        public void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable th, List<ExceptionHistoryEntry> list) {
            this.failingStateValidator.validateInput(new FailingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, th));
            this.hadStateTransition = true;
        }

        public CompletableFuture<String> goToStopWithSavepoint(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, CompletableFuture<String> completableFuture, List<ExceptionHistoryEntry> list) {
            this.stopWithSavepointValidator.validateInput(new StopWithSavepointArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, checkpointScheduling, completableFuture));
            this.hadStateTransition = true;
            return this.mockedStopWithSavepointOperationFuture;
        }

        public ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration) {
            return m538getMainThreadExecutor().schedule(() -> {
                runIfState(state, runnable);
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        }

        public boolean hasDesiredResources() {
            return this.hasDesiredResourcesSupplier.get().booleanValue();
        }

        public boolean hasSufficientResources() {
            return this.hasSufficientResourcesSupplier.get().booleanValue();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.MockStateWithExecutionGraphContext, java.lang.AutoCloseable
        public void close() throws Exception {
            super.close();
            this.failingStateValidator.close();
            this.restartingStateValidator.close();
            this.cancellingStateValidator.close();
            this.stopWithSavepointValidator.close();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.MockStateWithExecutionGraphContext
        public void archiveFailure(RootExceptionHistoryEntry rootExceptionHistoryEntry) {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$MockExecutionGraph.class */
    static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
        private final boolean updateStateReturnValue;
        private final Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier;

        MockExecutionGraph(Supplier<Iterable<ExecutionJobVertex>> supplier) {
            this(false, supplier);
        }

        private MockExecutionGraph(boolean z, Supplier<Iterable<ExecutionJobVertex>> supplier) {
            this.updateStateReturnValue = z;
            this.getVerticesTopologicallySupplier = supplier;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph
        public boolean updateState(TaskExecutionStateTransition taskExecutionStateTransition) {
            return this.updateStateReturnValue;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph
        public Iterable<ExecutionJobVertex> getVerticesTopologically() {
            return this.getVerticesTopologicallySupplier.get();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$MockExecutionJobVertex.class */
    static class MockExecutionJobVertex extends ExecutionJobVertex {
        private final ExecutionVertex mockExecutionVertex;

        MockExecutionJobVertex(Function<ExecutionJobVertex, ExecutionVertex> function) throws JobException {
            super(new MockInternalExecutionGraphAccessor(), new JobVertex("test"), new DefaultVertexParallelismInfo(1, 1, num -> {
                return Optional.empty();
            }), new CoordinatorStoreImpl(), UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
            initialize(1, Duration.ofMillis(1L), 1L, new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
            this.mockExecutionVertex = function.apply(this);
        }

        /* renamed from: getTaskVertices, reason: merged with bridge method [inline-methods] */
        public ExecutionVertex[] m535getTaskVertices() {
            return new ExecutionVertex[]{this.mockExecutionVertex};
        }

        public ExecutionVertex getMockExecutionVertex() {
            return this.mockExecutionVertex;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$MockExecutionVertex.class */
    static class MockExecutionVertex extends ExecutionVertex {
        private boolean deployCalled;
        private ExecutionState mockedExecutionState;

        MockExecutionVertex(ExecutionJobVertex executionJobVertex) {
            super(executionJobVertex, 1, new IntermediateResult[0], Duration.ofMillis(1L), 1L, 1, 0);
            this.deployCalled = false;
            this.mockedExecutionState = ExecutionState.RUNNING;
        }

        public void deploy() throws JobException {
            this.deployCalled = true;
        }

        public boolean isDeployCalled() {
            return this.deployCalled;
        }

        public ExecutionState getExecutionState() {
            return this.mockedExecutionState;
        }

        public void setMockedExecutionState(ExecutionState executionState) {
            this.mockedExecutionState = executionState;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$MockInternalExecutionGraphAccessor.class */
    private static class MockInternalExecutionGraphAccessor implements InternalExecutionGraphAccessor {
        private MockInternalExecutionGraphAccessor() {
        }

        public Executor getFutureExecutor() {
            return ForkJoinPool.commonPool();
        }

        public ClassLoader getUserClassLoader() {
            return null;
        }

        public JobID getJobID() {
            return null;
        }

        public BlobWriter getBlobWriter() {
            return null;
        }

        @Nonnull
        public ComponentMainThreadExecutor getJobMasterMainThreadExecutor() {
            return null;
        }

        public ShuffleMaster<?> getShuffleMaster() {
            return null;
        }

        public JobMasterPartitionTracker getPartitionTracker() {
            return null;
        }

        public void registerExecution(Execution execution) {
        }

        public void deregisterExecution(Execution execution) {
        }

        public PartitionGroupReleaseStrategy getPartitionGroupReleaseStrategy() {
            return null;
        }

        public void failGlobal(Throwable th) {
        }

        public void notifySchedulerNgAboutInternalTaskFailure(ExecutionAttemptID executionAttemptID, Throwable th, boolean z, boolean z2) {
        }

        public void jobVertexFinished() {
        }

        public void jobVertexUnFinished() {
        }

        public ExecutionDeploymentListener getExecutionDeploymentListener() {
            return null;
        }

        public void notifyExecutionChange(Execution execution, ExecutionState executionState, ExecutionState executionState2) {
        }

        public EdgeManager getEdgeManager() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID executionVertexID) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public IntermediateResultPartition getResultPartitionOrThrow(IntermediateResultPartitionID intermediateResultPartitionID) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public void deleteBlobs(List<PermanentBlobKey> list) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public ExecutionJobVertex getJobVertex(JobVertexID jobVertexID) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public boolean isDynamic() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public ExecutionGraphID getExecutionGraphID() {
            return new ExecutionGraphID();
        }

        public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(IntermediateDataSetID intermediateDataSetID) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public JobVertexInputInfo getJobVertexInputInfo(JobVertexID jobVertexID, IntermediateDataSetID intermediateDataSetID) {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }

        public TaskDeploymentDescriptorFactory getTaskDeploymentDescriptorFactory() {
            throw new UnsupportedOperationException("This method is not supported by the MockInternalExecutionGraphAccessor.");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$MockState.class */
    private static class MockState implements State {
        private MockState() {
        }

        public void cancel() {
        }

        public void suspend(Throwable th) {
        }

        public JobID getJobId() {
            return null;
        }

        public JobStatus getJobStatus() {
            return null;
        }

        public ArchivedExecutionGraph getJob() {
            return null;
        }

        public void handleGlobalFailure(Throwable th, CompletableFuture<Map<String, String>> completableFuture) {
        }

        public Logger getLogger() {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$RestartingArguments.class */
    public static class RestartingArguments extends CancellingArguments {
        private final Duration backoffTime;

        @Nullable
        private final VertexParallelism restartWithParallelism;

        public RestartingArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration duration, @Nullable VertexParallelism vertexParallelism) {
            super(executionGraph, executionGraphHandler, operatorCoordinatorHandler);
            this.backoffTime = duration;
            this.restartWithParallelism = vertexParallelism;
        }

        public Duration getBackoffTime() {
            return this.backoffTime;
        }

        public Optional<VertexParallelism> getRestartWithParallelism() {
            return Optional.ofNullable(this.restartWithParallelism);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/ExecutingTest$StopWithSavepointArguments.class */
    public static class StopWithSavepointArguments extends CancellingArguments {
        private final CheckpointScheduling checkpointScheduling;
        private final CompletableFuture<String> savepointFuture;

        public StopWithSavepointArguments(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, CompletableFuture<String> completableFuture) {
            super(executionGraph, executionGraphHandler, operatorCoordinatorHandler);
            this.checkpointScheduling = checkpointScheduling;
            this.savepointFuture = completableFuture;
        }
    }

    ExecutingTest() {
    }

    @Test
    void testExecutionGraphDeploymentOnEnter() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(MockExecutionVertex::new);
            MockExecutionVertex mockExecutionVertex = (MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex();
            mockExecutionVertex.setMockedExecutionState(ExecutionState.CREATED);
            MockExecutionGraph mockExecutionGraph = new MockExecutionGraph(() -> {
                return Collections.singletonList(mockExecutionJobVertex);
            });
            new ExecutingStateBuilder().setExecutionGraph(mockExecutionGraph).build(mockExecutingContext);
            Assertions.assertThat(mockExecutionVertex.isDeployCalled()).isTrue();
            Assertions.assertThat(mockExecutionGraph.getState()).isEqualTo(JobStatus.RUNNING);
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(MockExecutionVertex::new);
            MockExecutionGraph mockExecutionGraph = new MockExecutionGraph(() -> {
                return Collections.singletonList(mockExecutionJobVertex);
            });
            mockExecutionGraph.transitionToRunning();
            MockExecutionVertex mockExecutionVertex = (MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex();
            mockExecutionVertex.setMockedExecutionState(ExecutionState.RUNNING);
            new Executing(mockExecutionGraph, getExecutionGraphHandler(mockExecutionGraph, mockExecutingContext.m538getMainThreadExecutor()), new TestingOperatorCoordinatorHandler(), log, mockExecutingContext, ClassLoader.getSystemClassLoader(), new ArrayList(), context -> {
                return TestingStateTransitionManager.withNoOp();
            }, 1);
            Assertions.assertThat(mockExecutionVertex.isDeployCalled()).isFalse();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testIllegalStateExceptionOnNotRunningExecutionGraph() {
        Assertions.assertThatThrownBy(() -> {
            MockExecutingContext mockExecutingContext = new MockExecutingContext();
            try {
                StateTrackingMockExecutionGraph stateTrackingMockExecutionGraph = new StateTrackingMockExecutionGraph();
                Assertions.assertThat(stateTrackingMockExecutionGraph.getState()).isNotEqualTo(JobStatus.RUNNING);
                new Executing(stateTrackingMockExecutionGraph, getExecutionGraphHandler(stateTrackingMockExecutionGraph, mockExecutingContext.m538getMainThreadExecutor()), new TestingOperatorCoordinatorHandler(), log, mockExecutingContext, ClassLoader.getSystemClassLoader(), new ArrayList(), context -> {
                    return TestingStateTransitionManager.withNoOp();
                }, 1);
                mockExecutingContext.close();
            } catch (Throwable th) {
                try {
                    mockExecutingContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void testTriggerRescaleOnCompletedCheckpoint() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Function<StateTransitionManager.Context, StateTransitionManager> function = context -> {
            return TestingStateTransitionManager.withOnTriggerEventOnly(() -> {
                atomicBoolean.set(true);
            });
        };
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().setStateTransitionManagerFactory(function).build(mockExecutingContext);
            Assertions.assertThat(atomicBoolean).isFalse();
            build.onCompletedCheckpoint();
            Assertions.assertThat(atomicBoolean).isTrue();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testTriggerRescaleOnFailedCheckpoint() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        Function<StateTransitionManager.Context, StateTransitionManager> function = context -> {
            Objects.requireNonNull(atomicInteger);
            return TestingStateTransitionManager.withOnTriggerEventOnly(atomicInteger::incrementAndGet);
        };
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().setStateTransitionManagerFactory(function).setRescaleOnFailedCheckpointCount(3).build(mockExecutingContext);
            for (int i = 1; i <= 3; i++) {
                build.onFailedCheckpoint();
                build.onNewResourceRequirements();
                for (int i2 = 0; i2 < 3; i2++) {
                    ((AtomicIntegerAssert) Assertions.assertThat(atomicInteger).as("No rescale operation should have been triggered for iteration #%d, yet.", new Object[]{Integer.valueOf(i)})).hasValue(i - 1);
                    build.onFailedCheckpoint();
                }
                ((AtomicIntegerAssert) Assertions.assertThat(atomicInteger).as("The rescale operation for iteration #%d should have been properly triggered.", new Object[]{Integer.valueOf(i)})).hasValue(i);
            }
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        Function<StateTransitionManager.Context, StateTransitionManager> function = context -> {
            Objects.requireNonNull(atomicInteger);
            return TestingStateTransitionManager.withOnTriggerEventOnly(atomicInteger::incrementAndGet);
        };
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().setStateTransitionManagerFactory(function).setRescaleOnFailedCheckpointCount(3).build(mockExecutingContext);
            build.onFailedCheckpoint();
            build.onNewResourcesAvailable();
            IntStream.range(0, 2).forEach(i -> {
                build.onFailedCheckpoint();
            });
            ((AtomicIntegerAssert) Assertions.assertThat(atomicInteger).as("No rescaling should have been trigger, yet.", new Object[0])).hasValue(0);
            build.onCompletedCheckpoint();
            build.onNewResourceRequirements();
            ((AtomicIntegerAssert) Assertions.assertThat(atomicInteger).as("The completed checkpoint should have triggered a rescale.", new Object[0])).hasValue(1);
            IntStream.range(0, 2).forEach(i2 -> {
                build.onFailedCheckpoint();
            });
            ((AtomicIntegerAssert) Assertions.assertThat(atomicInteger).as("No additional rescaling should have been trigger by any subsequent failed checkpoint, yet.", new Object[0])).hasValue(1);
            build.onFailedCheckpoint();
            ((AtomicIntegerAssert) Assertions.assertThat(atomicInteger).as("The previous failed checkpoint should have triggered the rescale.", new Object[0])).hasValue(2);
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            TestingOperatorCoordinatorHandler testingOperatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
            new ExecutingStateBuilder().setOperatorCoordinatorHandler(testingOperatorCoordinatorHandler).build(mockExecutingContext).onLeave(MockState.class);
            Assertions.assertThat(testingOperatorCoordinatorHandler.isDisposed()).isTrue();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testUnrecoverableGlobalFailureTransitionsToFailingState() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().build(mockExecutingContext);
            mockExecutingContext.setExpectFailing(failingArguments -> {
                Assertions.assertThat(failingArguments.getExecutionGraph()).isNotNull();
                Assertions.assertThat(failingArguments.getFailureCause().getMessage()).isEqualTo("test exception");
            });
            mockExecutingContext.setHowToHandleFailure(FailureResult::canNotRestart);
            build.handleGlobalFailure(new RuntimeException("test exception"), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testRecoverableGlobalFailureTransitionsToRestarting() throws Exception {
        Duration duration = Duration.ZERO;
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().build(mockExecutingContext);
            mockExecutingContext.setExpectRestarting(restartingArguments -> {
                Assertions.assertThat(restartingArguments.getBackoffTime()).isEqualTo(duration);
                Assertions.assertThat(restartingArguments.getRestartWithParallelism()).isEmpty();
            });
            mockExecutingContext.setHowToHandleFailure(th -> {
                return FailureResult.canRestart(th, duration);
            });
            build.handleGlobalFailure(new RuntimeException("Recoverable error"), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            mockExecutingContext.close();
        } catch (Throwable th2) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Test
    void testCancelTransitionsToCancellingState() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().build(mockExecutingContext);
            mockExecutingContext.setExpectCancelling(WaitingForResourcesTest.assertNonNull());
            build.cancel();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testTransitionToFinishedOnFailedExecutionGraph() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().build(mockExecutingContext);
            mockExecutingContext.setExpectFinished(archivedExecutionGraph -> {
                Assertions.assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
            });
            build.getExecutionGraph().failJob(new RuntimeException("test failure"), System.currentTimeMillis());
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testTransitionToFinishedOnSuspend() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().build(mockExecutingContext);
            mockExecutingContext.setExpectFinished(archivedExecutionGraph -> {
                Assertions.assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
            });
            build.suspend(new RuntimeException("suspend"));
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testFailureReportedViaUpdateTaskExecutionStateCausesFailingOnNoRestart() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            StateTrackingMockExecutionGraph stateTrackingMockExecutionGraph = new StateTrackingMockExecutionGraph();
            Executing build = new ExecutingStateBuilder().setExecutionGraph(stateTrackingMockExecutionGraph).build(mockExecutingContext);
            mockExecutingContext.setHowToHandleFailure(FailureResult::canNotRestart);
            mockExecutingContext.setExpectFailing(WaitingForResourcesTest.assertNonNull());
            RuntimeException runtimeException = new RuntimeException();
            TestingAccessExecution build2 = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo(runtimeException, System.currentTimeMillis())).build();
            stateTrackingMockExecutionGraph.registerExecution(build2);
            build.updateTaskExecutionState(createFailingStateTransition(build2.getAttemptId(), runtimeException), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testFailureReportedViaUpdateTaskExecutionStateCausesRestart() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            StateTrackingMockExecutionGraph stateTrackingMockExecutionGraph = new StateTrackingMockExecutionGraph();
            Executing build = new ExecutingStateBuilder().setExecutionGraph(stateTrackingMockExecutionGraph).build(mockExecutingContext);
            mockExecutingContext.setHowToHandleFailure(th -> {
                return FailureResult.canRestart(th, Duration.ZERO);
            });
            mockExecutingContext.setExpectRestarting(restartingArguments -> {
                Assertions.assertThat(restartingArguments).isNotNull();
                Assertions.assertThat(restartingArguments.getRestartWithParallelism()).isEmpty();
            });
            RuntimeException runtimeException = new RuntimeException();
            TestingAccessExecution build2 = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo(runtimeException, System.currentTimeMillis())).build();
            stateTrackingMockExecutionGraph.registerExecution(build2);
            build.updateTaskExecutionState(createFailingStateTransition(build2.getAttemptId(), runtimeException), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            mockExecutingContext.close();
        } catch (Throwable th2) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Test
    void testFalseReportsViaUpdateTaskExecutionStateAreIgnored() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            MockExecutionGraph mockExecutionGraph = new MockExecutionGraph(false, Collections::emptyList);
            Executing build = new ExecutingStateBuilder().setExecutionGraph(mockExecutionGraph).build(mockExecutingContext);
            RuntimeException runtimeException = new RuntimeException();
            TestingAccessExecution build2 = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo(runtimeException, System.currentTimeMillis())).build();
            mockExecutionGraph.registerExecution(build2);
            build.updateTaskExecutionState(createFailingStateTransition(build2.getAttemptId(), runtimeException), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            mockExecutingContext.assertNoStateTransition();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testExecutionVertexMarkedAsFailedOnDeploymentFailure() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(FailOnDeployMockExecutionVertex::new);
            new ExecutingStateBuilder().setExecutionGraph(new MockExecutionGraph(() -> {
                return Collections.singletonList(mockExecutionJobVertex);
            })).build(mockExecutingContext);
            Assertions.assertThat(((FailOnDeployMockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex()).getMarkedFailure()).isInstanceOf(JobException.class);
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testTransitionToStopWithSavepointState() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            final CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
            Executing build2 = new ExecutingStateBuilder().setExecutionGraph(new StateTrackingMockExecutionGraph() { // from class: org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.1
                @Override // org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph
                @Nullable
                public CheckpointCoordinator getCheckpointCoordinator() {
                    return build;
                }
            }).build(mockExecutingContext);
            mockExecutingContext.setExpectStopWithSavepoint(WaitingForResourcesTest.assertNonNull());
            build2.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testCheckpointSchedulerIsStoppedOnStopWithSavepoint() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            final CheckpointCoordinator build = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
            Executing build2 = new ExecutingStateBuilder().setExecutionGraph(new StateTrackingMockExecutionGraph() { // from class: org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.2
                @Override // org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph
                @Nullable
                public CheckpointCoordinator getCheckpointCoordinator() {
                    return build;
                }
            }).build(mockExecutingContext);
            build.startCheckpointScheduler();
            Assertions.assertThat(build.isPeriodicCheckpointingStarted()).isTrue();
            mockExecutingContext.setExpectStopWithSavepoint(WaitingForResourcesTest.assertNonNull());
            build2.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
            Assertions.assertThat(build.isPeriodicCheckpointingStarted()).isFalse();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testJobInformationMethods() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().build(mockExecutingContext);
            JobID jobID = build.getExecutionGraph().getJobID();
            Assertions.assertThat(build.getJob()).isInstanceOf(ArchivedExecutionGraph.class);
            Assertions.assertThat(build.getJob().getJobID()).isEqualTo(jobID);
            Assertions.assertThat(build.getJobStatus()).isEqualTo(JobStatus.RUNNING);
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testStateDoesNotExposeGloballyTerminalExecutionGraph() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            FinishingMockExecutionGraph finishingMockExecutionGraph = new FinishingMockExecutionGraph();
            Executing build = new ExecutingStateBuilder().setExecutionGraph(finishingMockExecutionGraph).build(mockExecutingContext);
            mockExecutingContext.setExpectFinished(archivedExecutionGraph -> {
            });
            finishingMockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED);
            Assertions.assertThat(build.getExecutionGraph().getState()).isEqualTo(JobStatus.FINISHED);
            Assertions.assertThat(build.getJobStatus()).isEqualTo(JobStatus.RUNNING);
            Assertions.assertThat(build.getJob().getState()).isEqualTo(JobStatus.RUNNING);
            Assertions.assertThat(build.getJob().getStatusTimestamp(JobStatus.FINISHED)).isZero();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testExecutingChecksForNewResourcesWhenBeingCreated() throws Exception {
        ArrayDeque arrayDeque = new ArrayDeque();
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            new ExecutingStateBuilder().setStateTransitionManagerFactory(context -> {
                return new TestingStateTransitionManager(() -> {
                    arrayDeque.add("onChange");
                }, () -> {
                    arrayDeque.add("onTrigger");
                });
            }).build(mockExecutingContext);
            mockExecutingContext.triggerExecutors();
            Assertions.assertThat((String) arrayDeque.poll()).isEqualTo("onChange");
            Assertions.assertThat((String) arrayDeque.poll()).isEqualTo("onTrigger");
            Assertions.assertThat(arrayDeque.isEmpty()).isTrue();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testOmitsWaitingForResourcesStateWhenRestarting() throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            Executing build = new ExecutingStateBuilder().build(mockExecutingContext);
            VertexParallelism vertexParallelism = new VertexParallelism(Collections.singletonMap(new JobVertexID(), 2));
            mockExecutingContext.setVertexParallelism(vertexParallelism);
            mockExecutingContext.setExpectRestarting(restartingArguments -> {
                Assertions.assertThat(restartingArguments.getRestartWithParallelism()).hasValue(vertexParallelism);
            });
            build.transitionToSubsequentState();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInternalParallelismChangeBehavior(boolean z) throws Exception {
        MockExecutingContext mockExecutingContext = new MockExecutingContext();
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            Function<StateTransitionManager.Context, StateTransitionManager> function = context -> {
                return TestingStateTransitionManager.withOnChangeEventOnly(() -> {
                    Assertions.assertThat(context.hasDesiredResources()).isEqualTo(z);
                    Assertions.assertThat(context.hasSufficientResources()).isEqualTo(z);
                    atomicBoolean.set(true);
                });
            };
            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(MockExecutionVertex::new);
            MockExecutionGraph mockExecutionGraph = new MockExecutionGraph(() -> {
                return Collections.singletonList(mockExecutionJobVertex);
            });
            mockExecutingContext.setHasDesiredResources(() -> {
                return true;
            });
            mockExecutingContext.setHasSufficientResources(() -> {
                return true;
            });
            mockExecutingContext.setVertexParallelism(new VertexParallelism((Map) mockExecutionGraph.getAllVertices().values().stream().collect(Collectors.toMap((v0) -> {
                return v0.getJobVertexId();
            }, executionJobVertex -> {
                return Integer.valueOf(z ? 1 + executionJobVertex.getParallelism() : executionJobVertex.getParallelism());
            }))));
            new ExecutingStateBuilder().setStateTransitionManagerFactory(function).setExecutionGraph(mockExecutionGraph).build(mockExecutingContext).onNewResourcesAvailable();
            Assertions.assertThat(atomicBoolean.get()).isTrue();
            mockExecutingContext.close();
        } catch (Throwable th) {
            try {
                mockExecutingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public static TaskExecutionStateTransition createFailingStateTransition(ExecutionAttemptID executionAttemptID, Exception exc) throws JobException {
        return new TaskExecutionStateTransition(new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, exc));
    }

    private ExecutionGraphHandler getExecutionGraphHandler(ExecutionGraph executionGraph, ComponentMainThreadExecutor componentMainThreadExecutor) {
        return new ExecutionGraphHandler(executionGraph, log, componentMainThreadExecutor, componentMainThreadExecutor);
    }
}
