package org.apache.flink.runtime.scheduler.stopwithsavepoint;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.class */
class StopWithSavepointTerminationHandlerImplTest {
    private static final Logger log = LoggerFactory.getLogger(StopWithSavepointTerminationHandlerImplTest.class);
    private static final JobID JOB_ID = new JobID();
    private final TestingCheckpointScheduling checkpointScheduling = new TestingCheckpointScheduling(false);

    StopWithSavepointTerminationHandlerImplTest() {
    }

    private StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver() {
        return createTestInstance(th -> {
            throw new AssertionError("No global failover should be triggered.");
        });
    }

    private StopWithSavepointTerminationHandlerImpl createTestInstance(Consumer<Throwable> consumer) {
        this.checkpointScheduling.stopCheckpointScheduler();
        return new StopWithSavepointTerminationHandlerImpl(JOB_ID, TestingSchedulerNG.newBuilder().setHandleGlobalFailureConsumer(consumer).build(), this.checkpointScheduling, log);
    }

    @Test
    void testHappyPath() throws ExecutionException, InterruptedException {
        StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver = createTestInstanceFailingOnGlobalFailOver();
        EmptyStreamStateHandle emptyStreamStateHandle = new EmptyStreamStateHandle();
        CompletedCheckpoint createCompletedSavepoint = createCompletedSavepoint(emptyStreamStateHandle);
        createTestInstanceFailingOnGlobalFailOver.handleSavepointCreation(createCompletedSavepoint, (Throwable) null);
        createTestInstanceFailingOnGlobalFailOver.handleExecutionsTermination(Collections.singleton(ExecutionState.FINISHED));
        FlinkAssertions.assertThatFuture(createTestInstanceFailingOnGlobalFailOver.getSavepointPath()).isCompletedWithValue(createCompletedSavepoint.getExternalPointer());
        Assertions.assertThat(emptyStreamStateHandle.isDisposed()).withFailMessage("The savepoint should not have been discarded.", new Object[0]).isFalse();
        Assertions.assertThat(this.checkpointScheduling.isEnabled()).withFailMessage("Checkpoint scheduling should be disabled.", new Object[0]).isFalse();
    }

    @Test
    void testSavepointCreationFailureWithoutExecutionTermination() {
        assertSavepointCreationFailure(stopWithSavepointTerminationHandler -> {
        });
    }

    @Test
    void testSavepointCreationFailureWithFailingExecutions() {
        assertSavepointCreationFailure(stopWithSavepointTerminationHandler -> {
            stopWithSavepointTerminationHandler.handleExecutionsTermination(Collections.singletonList(ExecutionState.FAILED));
        });
    }

    @Test
    void testSavepointCreationFailureWithFinishingExecutions() {
        assertSavepointCreationFailure(stopWithSavepointTerminationHandler -> {
            stopWithSavepointTerminationHandler.handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED));
        });
    }

    public void assertSavepointCreationFailure(Consumer<StopWithSavepointTerminationHandler> consumer) {
        StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver = createTestInstanceFailingOnGlobalFailOver();
        createTestInstanceFailingOnGlobalFailOver.handleSavepointCreation((CompletedCheckpoint) null, new Exception("Expected exception during savepoint creation."));
        consumer.accept(createTestInstanceFailingOnGlobalFailOver);
        Assertions.assertThatThrownBy(() -> {
            createTestInstanceFailingOnGlobalFailOver.getSavepointPath().get();
        }).withFailMessage("An ExecutionException is expected.", new Object[0]).isInstanceOf(Throwable.class).hasMessageContaining("Expected exception during savepoint creation.");
        Assertions.assertThat(this.checkpointScheduling.isEnabled()).withFailMessage("Checkpoint scheduling should be enabled.", new Object[0]).isTrue();
    }

    @Test
    void testFailedTerminationHandling() {
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(completableFuture);
        StopWithSavepointTerminationHandlerImpl createTestInstance = createTestInstance((v1) -> {
            r1.complete(v1);
        });
        ExecutionState executionState = ExecutionState.FAILED;
        EmptyStreamStateHandle emptyStreamStateHandle = new EmptyStreamStateHandle();
        createTestInstance.handleSavepointCreation(createCompletedSavepoint(emptyStreamStateHandle), (Throwable) null);
        createTestInstance.handleExecutionsTermination(Collections.singletonList(executionState));
        Assertions.assertThatThrownBy(() -> {
            createTestInstance.getSavepointPath().get();
        }).withFailMessage("An ExecutionException is expected.", new Object[0]).isInstanceOf(Throwable.class).hasCauseInstanceOf(StopWithSavepointStoppingException.class);
        FlinkAssertions.assertThatFuture(completableFuture).withFailMessage("Global fail-over was not triggered.", new Object[0]).isDone();
        Assertions.assertThat(emptyStreamStateHandle.isDisposed()).withFailMessage("Savepoint should not be discarded.", new Object[0]).isFalse();
    }

    @Test
    void testInvalidExecutionTerminationCall() {
        Assertions.assertThatThrownBy(() -> {
            createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED));
        }).isInstanceOf(UnsupportedOperationException.class);
    }

    @Test
    void testSavepointCreationParameterBothNull() {
        Assertions.assertThatThrownBy(() -> {
            createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation((CompletedCheckpoint) null, (Throwable) null);
        }).isInstanceOf(NullPointerException.class);
    }

    @Test
    void testSavepointCreationParameterBothSet() {
        Assertions.assertThatThrownBy(() -> {
            createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation(createCompletedSavepoint(new EmptyStreamStateHandle()), new Exception("No exception should be passed if a savepoint is available."));
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testExecutionTerminationWithNull() {
        Assertions.assertThatThrownBy(() -> {
            createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination((Collection) null);
        }).isInstanceOf(NullPointerException.class);
    }

    private static CompletedCheckpoint createCompletedSavepoint(StreamStateHandle streamStateHandle) {
        return new CompletedCheckpoint(JOB_ID, 0L, 0L, 0L, new HashMap(), (Collection) null, CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL), new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"), (CompletedCheckpointStats) null);
    }
}
