package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RestartingTest.class */
class RestartingTest {
    private static final Logger log = LoggerFactory.getLogger(RestartingTest.class);

    RestartingTest() {
    }

    @Test
    void testExecutionGraphCancellationOnEnter() throws Exception {
        MockRestartingContext mockRestartingContext = new MockRestartingContext();
        try {
            StateTrackingMockExecutionGraph stateTrackingMockExecutionGraph = new StateTrackingMockExecutionGraph();
            createRestartingState(mockRestartingContext, stateTrackingMockExecutionGraph);
            Assertions.assertThat(stateTrackingMockExecutionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
            mockRestartingContext.close();
        } catch (Throwable th) {
            try {
                mockRestartingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"provideRestartWithParallelism"})
    @ParameterizedTest
    public void testTransitionToSubsequentStateWhenCancellationComplete(Optional<VertexParallelism> optional) throws Exception {
        MockRestartingContext mockRestartingContext = new MockRestartingContext();
        try {
            Objects.requireNonNull(mockRestartingContext);
            optional.ifPresent(mockRestartingContext::setAvailableVertexParallelism);
            Restarting createRestartingState = createRestartingState(mockRestartingContext, optional.orElse(null));
            if (optional.isPresent()) {
                mockRestartingContext.setExpectCreatingExecutionGraph();
            } else {
                mockRestartingContext.setExpectWaitingForResources();
            }
            createRestartingState.onGloballyTerminalState(JobStatus.CANCELED);
            mockRestartingContext.close();
        } catch (Throwable th) {
            try {
                mockRestartingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testTransitionToSubsequentStateWhenResourceChanged() throws Exception {
        MockRestartingContext mockRestartingContext = new MockRestartingContext();
        try {
            JobVertexID jobVertexID = new JobVertexID();
            VertexParallelism vertexParallelism = new VertexParallelism(Collections.singletonMap(jobVertexID, 1));
            VertexParallelism vertexParallelism2 = new VertexParallelism(Collections.singletonMap(jobVertexID, 2));
            mockRestartingContext.setAvailableVertexParallelism(vertexParallelism);
            Restarting createRestartingState = createRestartingState(mockRestartingContext, vertexParallelism2);
            mockRestartingContext.setExpectWaitingForResources();
            createRestartingState.onGloballyTerminalState(JobStatus.CANCELED);
            mockRestartingContext.close();
        } catch (Throwable th) {
            try {
                mockRestartingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testCancel() throws Exception {
        MockRestartingContext mockRestartingContext = new MockRestartingContext();
        try {
            Restarting createRestartingState = createRestartingState(mockRestartingContext);
            mockRestartingContext.setExpectCancelling(WaitingForResourcesTest.assertNonNull());
            createRestartingState.cancel();
            mockRestartingContext.close();
        } catch (Throwable th) {
            try {
                mockRestartingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testSuspendWithJobInCancellingState() throws Exception {
        testSuspend(false);
    }

    @Test
    void testSuspendWithJobInCancelledState() throws Exception {
        testSuspend(true);
    }

    private void testSuspend(boolean z) throws Exception {
        MockRestartingContext mockRestartingContext = new MockRestartingContext();
        try {
            StateTrackingMockExecutionGraph stateTrackingMockExecutionGraph = new StateTrackingMockExecutionGraph();
            Restarting createRestartingState = createRestartingState(mockRestartingContext, stateTrackingMockExecutionGraph);
            if (z) {
                stateTrackingMockExecutionGraph.completeTerminationFuture(JobStatus.CANCELED);
            }
            mockRestartingContext.setExpectFinished(archivedExecutionGraph -> {
                Assertions.assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
            });
            createRestartingState.suspend(new RuntimeException("suspend"));
            mockRestartingContext.close();
        } catch (Throwable th) {
            try {
                mockRestartingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testGlobalFailuresAreIgnored() throws Exception {
        MockRestartingContext mockRestartingContext = new MockRestartingContext();
        try {
            createRestartingState(mockRestartingContext).handleGlobalFailure(new RuntimeException(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            mockRestartingContext.assertNoStateTransition();
            mockRestartingContext.close();
        } catch (Throwable th) {
            try {
                mockRestartingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"provideRestartWithParallelism"})
    @ParameterizedTest
    public void testStateDoesNotExposeGloballyTerminalExecutionGraph(Optional<VertexParallelism> optional) throws Exception {
        MockRestartingContext mockRestartingContext = new MockRestartingContext();
        try {
            Objects.requireNonNull(mockRestartingContext);
            optional.ifPresent(mockRestartingContext::setAvailableVertexParallelism);
            StateTrackingMockExecutionGraph stateTrackingMockExecutionGraph = new StateTrackingMockExecutionGraph();
            Restarting createRestartingState = createRestartingState(mockRestartingContext, stateTrackingMockExecutionGraph, optional.orElse(null));
            if (optional.isPresent()) {
                mockRestartingContext.setExpectCreatingExecutionGraph();
            } else {
                mockRestartingContext.setExpectWaitingForResources();
            }
            stateTrackingMockExecutionGraph.completeTerminationFuture(JobStatus.CANCELED);
            Assertions.assertThat(createRestartingState.getExecutionGraph().getState()).isEqualTo(JobStatus.CANCELED);
            Assertions.assertThat(createRestartingState.getJobStatus()).isEqualTo(JobStatus.RESTARTING);
            Assertions.assertThat(createRestartingState.getJob().getState()).isEqualTo(JobStatus.RESTARTING);
            Assertions.assertThat(createRestartingState.getJob().getStatusTimestamp(JobStatus.CANCELED)).isZero();
            mockRestartingContext.close();
        } catch (Throwable th) {
            try {
                mockRestartingContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public Restarting createRestartingState(MockRestartingContext mockRestartingContext, @Nullable VertexParallelism vertexParallelism) {
        return createRestartingState(mockRestartingContext, new StateTrackingMockExecutionGraph(), vertexParallelism);
    }

    public Restarting createRestartingState(MockRestartingContext mockRestartingContext, ExecutionGraph executionGraph) {
        return createRestartingState(mockRestartingContext, executionGraph, null);
    }

    public Restarting createRestartingState(MockRestartingContext mockRestartingContext, ExecutionGraph executionGraph, @Nullable VertexParallelism vertexParallelism) {
        ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler(executionGraph, log, mockRestartingContext.m538getMainThreadExecutor(), mockRestartingContext.m538getMainThreadExecutor());
        TestingOperatorCoordinatorHandler testingOperatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
        executionGraph.transitionToRunning();
        return new Restarting(mockRestartingContext, executionGraph, executionGraphHandler, testingOperatorCoordinatorHandler, log, Duration.ZERO, vertexParallelism, ClassLoader.getSystemClassLoader(), new ArrayList());
    }

    public Restarting createRestartingState(MockRestartingContext mockRestartingContext) {
        return createRestartingState(mockRestartingContext, new StateTrackingMockExecutionGraph());
    }

    private static Stream<Arguments> provideRestartWithParallelism() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Optional.empty()}), Arguments.of(new Object[]{Optional.of(new VertexParallelism(Collections.singletonMap(new JobVertexID(), 1)))})});
    }
}
