package org.apache.flink.runtime.jobmaster;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.util.SerializedThrowable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.class */
class DefaultJobMasterServiceProcessTest {
    private static final JobID jobId = new JobID();
    private static final Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory = th -> {
        return ArchivedExecutionGraph.createSparseArchivedExecutionGraph(jobId, "test", JobStatus.FAILED, (JobType) null, th, (JobCheckpointingSettings) null, 1337L);
    };

    DefaultJobMasterServiceProcessTest() {
    }

    @Test
    void testInitializationFailureCompletesResultFuture() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        RuntimeException runtimeException = new RuntimeException("Init error");
        completableFuture.completeExceptionally(runtimeException);
        JobManagerRunnerResult jobManagerRunnerResult = (JobManagerRunnerResult) createTestInstance.getResultFuture().join();
        Assertions.assertThat(jobManagerRunnerResult.isInitializationFailure()).isTrue();
        Assertions.assertThat(jobManagerRunnerResult.getInitializationFailure()).isInstanceOf(JobInitializationException.class).hasCause(runtimeException);
    }

    @Test
    void testInitializationFailureSetsFailureInfoProperly() throws ExecutionException, InterruptedException {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        RuntimeException runtimeException = new RuntimeException("Expected RuntimeException");
        long currentTimeMillis = System.currentTimeMillis();
        completableFuture.completeExceptionally(runtimeException);
        long currentTimeMillis2 = System.currentTimeMillis();
        ErrorInfo failureInfo = ((JobManagerRunnerResult) createTestInstance.getResultFuture().get()).getExecutionGraphInfo().getArchivedExecutionGraph().getFailureInfo();
        Assertions.assertThat(failureInfo).isNotNull();
        assertInitializationException(failureInfo.getException(), runtimeException, failureInfo.getTimestamp(), currentTimeMillis, currentTimeMillis2);
    }

    @Test
    void testInitializationFailureSetsExceptionHistoryProperly() throws ExecutionException, InterruptedException {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        RuntimeException runtimeException = new RuntimeException("Expected RuntimeException");
        long currentTimeMillis = System.currentTimeMillis();
        completableFuture.completeExceptionally(runtimeException);
        long currentTimeMillis2 = System.currentTimeMillis();
        RootExceptionHistoryEntry rootExceptionHistoryEntry = (RootExceptionHistoryEntry) Iterables.getOnlyElement(((JobManagerRunnerResult) createTestInstance.getResultFuture().get()).getExecutionGraphInfo().getExceptionHistory());
        assertInitializationException(rootExceptionHistoryEntry.getException(), runtimeException, rootExceptionHistoryEntry.getTimestamp(), currentTimeMillis, currentTimeMillis2);
        Assertions.assertThat(rootExceptionHistoryEntry.isGlobal()).isTrue();
    }

    private static void assertInitializationException(SerializedThrowable serializedThrowable, Throwable th, long j, long j2, long j3) {
        Assertions.assertThat(serializedThrowable.deserializeError(Thread.currentThread().getContextClassLoader())).isInstanceOf(JobInitializationException.class).hasCause(th);
        Assertions.assertThat(j).isGreaterThanOrEqualTo(j2).isLessThanOrEqualTo(j3);
    }

    @Test
    void testCloseAfterInitializationFailure() throws Exception {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.completeExceptionally(new RuntimeException("Init error"));
        createTestInstance.closeAsync().get();
        Assertions.assertThat(createTestInstance.getResultFuture()).isCompletedWithValueMatching((v0) -> {
            return v0.isInitializationFailure();
        });
        Assertions.assertThat(createTestInstance.getJobMasterGatewayFuture()).isCompletedExceptionally();
    }

    @Test
    void testCloseAfterInitializationSuccess() throws Exception {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        TestingJobMasterService testingJobMasterService = new TestingJobMasterService();
        completableFuture.complete(testingJobMasterService);
        createTestInstance.closeAsync().get();
        Assertions.assertThat(testingJobMasterService.isClosed()).isTrue();
        FlinkAssertions.assertThatFuture(createTestInstance.getResultFuture()).eventuallyFailsWith(ExecutionException.class).extracting((v0) -> {
            return FlinkAssertions.chainOfCauses(v0);
        }, FlinkAssertions.STREAM_THROWABLE).anySatisfy(th -> {
            Assertions.assertThat(th).isInstanceOf(JobNotFinishedException.class);
        });
    }

    @Test
    void testJobMasterTerminationIsHandled() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture.complete(new TestingJobMasterService("localhost", completableFuture2, null));
        RuntimeException runtimeException = new RuntimeException("Fake exception from JobMaster");
        completableFuture2.completeExceptionally(runtimeException);
        FlinkAssertions.assertThatFuture(createTestInstance.getResultFuture()).eventuallyFailsWith(ExecutionException.class).extracting((v0) -> {
            return FlinkAssertions.chainOfCauses(v0);
        }, FlinkAssertions.STREAM_THROWABLE).anySatisfy(th -> {
            Assertions.assertThat(th).isEqualTo(runtimeException);
        });
    }

    @Test
    void testJobMasterGatewayGetsForwarded() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        TestingJobMasterGateway build = new TestingJobMasterGatewayBuilder().build();
        completableFuture.complete(new TestingJobMasterService("localhost", null, build));
        Assertions.assertThat(createTestInstance.getJobMasterGatewayFuture()).isCompletedWithValue(build);
    }

    @Test
    void testLeaderAddressGetsForwarded() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService("yolohost", null, null));
        Assertions.assertThat(createTestInstance.getLeaderAddressFuture()).isCompletedWithValue("yolohost");
    }

    @Test
    void testIsNotInitialized() {
        Assertions.assertThat(createTestInstance(new CompletableFuture<>()).isInitializedAndRunning()).isFalse();
    }

    @Test
    void testIsInitialized() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService());
        Assertions.assertThat(createTestInstance.isInitializedAndRunning()).isTrue();
    }

    @Test
    void testIsNotInitializedAfterClosing() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService());
        createTestInstance.closeAsync();
        Assertions.assertThat(createTestInstance.isInitializedAndRunning()).isFalse();
    }

    @Test
    void testSuccessOnTerminalState() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService());
        createTestInstance.jobReachedGloballyTerminalState(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build()));
        Assertions.assertThat(createTestInstance.getResultFuture()).isCompletedWithValueMatching((v0) -> {
            return v0.isSuccess();
        }).isCompletedWithValueMatching(jobManagerRunnerResult -> {
            return jobManagerRunnerResult.getExecutionGraphInfo().getArchivedExecutionGraph().getState() == JobStatus.FINISHED;
        });
    }

    private DefaultJobMasterServiceProcess createTestInstance(CompletableFuture<JobMasterService> completableFuture) {
        return new DefaultJobMasterServiceProcess(jobId, UUID.randomUUID(), new TestingJobMasterServiceFactory(onCompletionActions -> {
            return completableFuture;
        }), failedArchivedExecutionGraphFactory);
    }
}
