package org.apache.flink.runtime.scheduler.stopwithsavepoint;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.class */
public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
    private static final JobID JOB_ID = new JobID();
    private final TestingCheckpointScheduling checkpointScheduling = new TestingCheckpointScheduling(false);

    private StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver() {
        return createTestInstance(th -> {
            Assert.fail("No global failover should be triggered.");
        });
    }

    private StopWithSavepointTerminationHandlerImpl createTestInstance(Consumer<Throwable> consumer) {
        this.checkpointScheduling.stopCheckpointScheduler();
        return new StopWithSavepointTerminationHandlerImpl(JOB_ID, TestingSchedulerNG.newBuilder().setHandleGlobalFailureConsumer(consumer).build(), this.checkpointScheduling, this.log);
    }

    @Test
    public void testHappyPath() throws ExecutionException, InterruptedException {
        StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver = createTestInstanceFailingOnGlobalFailOver();
        EmptyStreamStateHandle emptyStreamStateHandle = new EmptyStreamStateHandle();
        CompletedCheckpoint createCompletedSavepoint = createCompletedSavepoint(emptyStreamStateHandle);
        createTestInstanceFailingOnGlobalFailOver.handleSavepointCreation(createCompletedSavepoint, (Throwable) null);
        createTestInstanceFailingOnGlobalFailOver.handleExecutionsTermination(Collections.singleton(ExecutionState.FINISHED));
        Assert.assertThat(createTestInstanceFailingOnGlobalFailOver.getSavepointPath().get(), CoreMatchers.is(createCompletedSavepoint.getExternalPointer()));
        Assert.assertFalse("The savepoint should not have been discarded.", emptyStreamStateHandle.isDisposed());
        Assert.assertFalse("Checkpoint scheduling should be disabled.", this.checkpointScheduling.isEnabled());
    }

    @Test
    public void testSavepointCreationFailureWithoutExecutionTermination() {
        assertSavepointCreationFailure(stopWithSavepointTerminationHandler -> {
        });
    }

    @Test
    public void testSavepointCreationFailureWithFailingExecutions() {
        assertSavepointCreationFailure(stopWithSavepointTerminationHandler -> {
            stopWithSavepointTerminationHandler.handleExecutionsTermination(Collections.singletonList(ExecutionState.FAILED));
        });
    }

    @Test
    public void testSavepointCreationFailureWithFinishingExecutions() {
        assertSavepointCreationFailure(stopWithSavepointTerminationHandler -> {
            stopWithSavepointTerminationHandler.handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED));
        });
    }

    public void assertSavepointCreationFailure(Consumer<StopWithSavepointTerminationHandler> consumer) {
        StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver = createTestInstanceFailingOnGlobalFailOver();
        createTestInstanceFailingOnGlobalFailOver.handleSavepointCreation((CompletedCheckpoint) null, new Exception("Expected exception during savepoint creation."));
        consumer.accept(createTestInstanceFailingOnGlobalFailOver);
        try {
            createTestInstanceFailingOnGlobalFailOver.getSavepointPath().get();
            Assert.fail("An ExecutionException is expected.");
        } catch (Throwable th) {
            Assert.assertTrue("An exception with the expected error message should have been thrown.", ExceptionUtils.findThrowableWithMessage(th, "Expected exception during savepoint creation.").isPresent());
        }
        Assert.assertTrue("Checkpoint scheduling should be enabled.", this.checkpointScheduling.isEnabled());
    }

    @Test
    public void testFailedTerminationHandling() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        StopWithSavepointTerminationHandlerImpl createTestInstance = createTestInstance((v1) -> {
            r1.complete(v1);
        });
        ExecutionState executionState = ExecutionState.FAILED;
        EmptyStreamStateHandle emptyStreamStateHandle = new EmptyStreamStateHandle();
        createTestInstance.handleSavepointCreation(createCompletedSavepoint(emptyStreamStateHandle), (Throwable) null);
        createTestInstance.handleExecutionsTermination(Collections.singletonList(executionState));
        try {
            createTestInstance.getSavepointPath().get();
            Assert.fail("An ExecutionException is expected.");
        } catch (Throwable th) {
            Assert.assertTrue("A FlinkException should have been thrown.", ExceptionUtils.findThrowable(th, StopWithSavepointStoppingException.class).isPresent());
        }
        Assert.assertTrue("Global fail-over was not triggered.", completableFuture.isDone());
        Assert.assertFalse("Savepoint should not be discarded.", emptyStreamStateHandle.isDisposed());
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testInvalidExecutionTerminationCall() {
        createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED));
    }

    @Test(expected = NullPointerException.class)
    public void testSavepointCreationParameterBothNull() {
        createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation((CompletedCheckpoint) null, (Throwable) null);
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSavepointCreationParameterBothSet() {
        createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation(createCompletedSavepoint(new EmptyStreamStateHandle()), new Exception("No exception should be passed if a savepoint is available."));
    }

    @Test(expected = NullPointerException.class)
    public void testExecutionTerminationWithNull() {
        createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination((Collection) null);
    }

    private static CompletedCheckpoint createCompletedSavepoint(StreamStateHandle streamStateHandle) {
        return new CompletedCheckpoint(JOB_ID, 0L, 0L, 0L, new HashMap(), (Collection) null, CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL), new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"), (CompletedCheckpointStats) null);
    }
}
