/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.MockRestartingContext;
import org.apache.flink.runtime.scheduler.adaptive.Restarting;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.TestingOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RestartingTest {
    private static final Logger log = LoggerFactory.getLogger(RestartingTest.class);

    RestartingTest() {
    }

    @Test
    void testExecutionGraphCancellationOnEnter() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            this.createRestartingState(ctx, mockExecutionGraph);
            Assertions.assertThat((Comparable)mockExecutionGraph.getState()).isEqualTo((Object)JobStatus.CANCELLING);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"provideRestartWithParallelism"})
    public void testTransitionToSubsequentStateWhenCancellationComplete(Optional<VertexParallelism> restartWithParallelism) throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            restartWithParallelism.ifPresent(ctx::setAvailableVertexParallelism);
            Restarting restarting = this.createRestartingState(ctx, (VertexParallelism)restartWithParallelism.orElse(null));
            if (restartWithParallelism.isPresent()) {
                ctx.setExpectCreatingExecutionGraph();
            } else {
                ctx.setExpectWaitingForResources();
            }
            restarting.onGloballyTerminalState(JobStatus.CANCELED);
        }
    }

    @Test
    public void testTransitionToSubsequentStateWhenResourceChanged() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            JobVertexID jobVertexId = new JobVertexID();
            VertexParallelism availableParallelism = new VertexParallelism(Collections.singletonMap(jobVertexId, 1));
            VertexParallelism requiredParallelismForForcedRestart = new VertexParallelism(Collections.singletonMap(jobVertexId, 2));
            ctx.setAvailableVertexParallelism(availableParallelism);
            Restarting restarting = this.createRestartingState(ctx, requiredParallelismForForcedRestart);
            ctx.setExpectWaitingForResources();
            restarting.onGloballyTerminalState(JobStatus.CANCELED);
        }
    }

    @Test
    void testCancel() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            Restarting restarting = this.createRestartingState(ctx);
            ctx.setExpectCancelling(WaitingForResourcesTest.assertNonNull());
            restarting.cancel();
        }
    }

    @Test
    void testSuspendWithJobInCancellingState() throws Exception {
        this.testSuspend(false);
    }

    @Test
    void testSuspendWithJobInCancelledState() throws Exception {
        this.testSuspend(true);
    }

    private void testSuspend(boolean cancellationCompleted) throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            Restarting restarting = this.createRestartingState(ctx, executionGraph);
            if (cancellationCompleted) {
                executionGraph.completeTerminationFuture(JobStatus.CANCELED);
            }
            ctx.setExpectFinished(archivedExecutionGraph -> Assertions.assertThat((Comparable)archivedExecutionGraph.getState()).isEqualTo((Object)JobStatus.SUSPENDED));
            RuntimeException cause = new RuntimeException("suspend");
            restarting.suspend((Throwable)cause);
        }
    }

    @Test
    void testGlobalFailuresAreIgnored() throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            Restarting restarting = this.createRestartingState(ctx);
            restarting.handleGlobalFailure((Throwable)new RuntimeException(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            ctx.assertNoStateTransition();
        }
    }

    @ParameterizedTest
    @MethodSource(value={"provideRestartWithParallelism"})
    public void testStateDoesNotExposeGloballyTerminalExecutionGraph(Optional<VertexParallelism> restartWithParallelism) throws Exception {
        try (MockRestartingContext ctx = new MockRestartingContext();){
            restartWithParallelism.ifPresent(ctx::setAvailableVertexParallelism);
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            Restarting restarting = this.createRestartingState(ctx, mockExecutionGraph, restartWithParallelism.orElse(null));
            if (restartWithParallelism.isPresent()) {
                ctx.setExpectCreatingExecutionGraph();
            } else {
                ctx.setExpectWaitingForResources();
            }
            mockExecutionGraph.completeTerminationFuture(JobStatus.CANCELED);
            Assertions.assertThat((Comparable)restarting.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.CANCELED);
            Assertions.assertThat((Comparable)restarting.getJobStatus()).isEqualTo((Object)JobStatus.RESTARTING);
            Assertions.assertThat((Comparable)restarting.getJob().getState()).isEqualTo((Object)JobStatus.RESTARTING);
            Assertions.assertThat((long)restarting.getJob().getStatusTimestamp(JobStatus.CANCELED)).isZero();
        }
    }

    public Restarting createRestartingState(MockRestartingContext ctx, @Nullable VertexParallelism restartWithParallelism) {
        return this.createRestartingState(ctx, new StateTrackingMockExecutionGraph(), restartWithParallelism);
    }

    public Restarting createRestartingState(MockRestartingContext ctx, ExecutionGraph executionGraph) {
        return this.createRestartingState(ctx, executionGraph, null);
    }

    public Restarting createRestartingState(MockRestartingContext ctx, ExecutionGraph executionGraph, @Nullable VertexParallelism restartWithParallelism) {
        ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler(executionGraph, log, (Executor)ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor());
        TestingOperatorCoordinatorHandler operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
        executionGraph.transitionToRunning();
        return new Restarting((Restarting.Context)ctx, executionGraph, executionGraphHandler, (OperatorCoordinatorHandler)operatorCoordinatorHandler, log, Duration.ZERO, restartWithParallelism, ClassLoader.getSystemClassLoader(), new ArrayList());
    }

    public Restarting createRestartingState(MockRestartingContext ctx) {
        return this.createRestartingState(ctx, new StateTrackingMockExecutionGraph());
    }

    private static Stream<Arguments> provideRestartWithParallelism() {
        return Stream.of(Arguments.of((Object[])new Object[]{Optional.empty()}), Arguments.of((Object[])new Object[]{Optional.of(new VertexParallelism(Collections.singletonMap(new JobVertexID(), 1)))}));
    }
}

