package org.apache.flink.runtime.jobmaster;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.class */
public class JobMasterServiceLeadershipRunnerTest {
    private static final Time TESTING_TIMEOUT = Time.seconds(10);
    private static JobGraph jobGraph;
    private TestingLeaderElectionService leaderElectionService;
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobResultStore jobResultStore;

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest$JobMasterServiceLeadershipRunnerBuilder.class */
    public class JobMasterServiceLeadershipRunnerBuilder {
        private JobMasterServiceProcessFactory jobMasterServiceProcessFactory = TestingJobMasterServiceProcessFactory.newBuilder().build();
        private LibraryCacheManager.ClassLoaderLease classLoaderLease = TestingClassLoaderLease.newBuilder().build();
        private LeaderElectionService leaderElectionService;

        public JobMasterServiceLeadershipRunnerBuilder() {
            this.leaderElectionService = JobMasterServiceLeadershipRunnerTest.this.leaderElectionService;
        }

        public JobMasterServiceLeadershipRunnerBuilder setClassLoaderLease(LibraryCacheManager.ClassLoaderLease classLoaderLease) {
            this.classLoaderLease = classLoaderLease;
            return this;
        }

        public JobMasterServiceLeadershipRunnerBuilder setJobMasterServiceProcessFactory(JobMasterServiceProcessFactory jobMasterServiceProcessFactory) {
            this.jobMasterServiceProcessFactory = jobMasterServiceProcessFactory;
            return this;
        }

        public JobMasterServiceLeadershipRunnerBuilder setLeaderElectionService(LeaderElectionService leaderElectionService) {
            this.leaderElectionService = leaderElectionService;
            return this;
        }

        public JobMasterServiceLeadershipRunner build() {
            return new JobMasterServiceLeadershipRunner(this.jobMasterServiceProcessFactory, this.leaderElectionService, JobMasterServiceLeadershipRunnerTest.this.jobResultStore, this.classLoaderLease, JobMasterServiceLeadershipRunnerTest.this.fatalErrorHandler);
        }

        public JobMasterServiceLeadershipRunnerBuilder withSingleJobMasterServiceProcess(JobMasterServiceProcess jobMasterServiceProcess) {
            return withJobMasterServiceProcesses(jobMasterServiceProcess);
        }

        public JobMasterServiceLeadershipRunnerBuilder withJobMasterServiceProcesses(JobMasterServiceProcess... jobMasterServiceProcessArr) {
            ArrayDeque arrayDeque = new ArrayDeque(Arrays.asList(jobMasterServiceProcessArr));
            this.jobMasterServiceProcessFactory = TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(uuid -> {
                return (JobMasterServiceProcess) Preconditions.checkNotNull(arrayDeque.poll());
            }).build();
            return this;
        }
    }

    JobMasterServiceLeadershipRunnerTest() {
    }

    @BeforeAll
    static void setupClass() {
        JobVertex jobVertex = new JobVertex("Test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
    }

    @BeforeEach
    void setup() {
        this.leaderElectionService = new TestingLeaderElectionService();
        this.jobResultStore = new EmbeddedJobResultStore();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.fatalErrorHandler.rethrowError();
    }

    @Test
    void testShutDownSignalsJobAsNotFinished() throws Exception {
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().build();
        Throwable th = null;
        try {
            build.start();
            CompletableFuture<JobManagerRunnerResult> resultFuture = build.getResultFuture();
            Assertions.assertThat(resultFuture).isNotDone();
            build.closeAsync();
            assertJobNotFinished(resultFuture);
            Assertions.assertThat(build.getJobMasterGateway()).failsWithin(5L, TimeUnit.MILLISECONDS);
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCloseReleasesClassLoaderLease() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingClassLoaderLease.Builder newBuilder = TestingClassLoaderLease.newBuilder();
        oneShotLatch.getClass();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().setClassLoaderLease(newBuilder.setCloseRunnable(oneShotLatch::trigger).build()).build();
        Throwable th = null;
        try {
            try {
                build.start();
                build.close();
                oneShotLatch.await();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testConcurrentLeadershipOperationsBlockingClose() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        newJobMasterServiceLeadershipRunnerBuilder().withJobMasterServiceProcesses(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            return completableFuture;
        }).build(), TestingJobMasterServiceProcess.newBuilder().build()).build().start();
        this.leaderElectionService.isLeader(UUID.randomUUID()).get();
        this.leaderElectionService.notLeader();
        CompletableFuture<UUID> isLeader = this.leaderElectionService.isLeader(UUID.randomUUID());
        Assertions.assertThat(isLeader).isNotDone();
        Assertions.assertThat(isLeader).withFailMessage("Granted leadership even though the JobMaster has not been suspended.", new Object[0]).failsWithin(1L, TimeUnit.MILLISECONDS);
        completableFuture.complete(null);
        isLeader.get();
    }

    @Test
    void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetResultFutureSupplier(() -> {
            return completableFuture;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID()).get();
        FlinkException flinkException = new FlinkException("The JobMasterService failed unexpectedly.");
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(build.getResultFuture()).failsWithin(5L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCause(flinkException);
    }

    @Test
    void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        CompletableFuture completedFuture = CompletableFuture.completedFuture(JobManagerRunnerResult.forInitializationFailure(createFailedExecutionGraphInfo(flinkException), flinkException));
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetResultFutureSupplier(() -> {
            return completedFuture;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        JobManagerRunnerResult jobManagerRunnerResult = (JobManagerRunnerResult) build.getResultFuture().join();
        Assertions.assertThat(jobManagerRunnerResult.isInitializationFailure()).isTrue();
        Assertions.assertThat(jobManagerRunnerResult.getInitializationFailure()).isEqualTo(flinkException);
    }

    @Nonnull
    private ExecutionGraphInfo createFailedExecutionGraphInfo(FlinkException flinkException) {
        return new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph(jobGraph.getJobID(), jobGraph.getName(), JobStatus.FAILED, flinkException, (JobCheckpointingSettings) null, 1L));
    }

    @Test
    void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            completableFuture.complete(null);
            return completableFuture;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        build.closeAsync().join();
        assertJobNotFinished(build.getResultFuture());
        Assertions.assertThat(completableFuture).isDone();
    }

    @Test
    void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            completableFuture.complete(null);
            return completableFuture;
        }).build()).build().start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        Assertions.assertThat(completableFuture).isDone();
    }

    @Test
    void testCancellationIsForwardedToJobMasterService() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(() -> {
            return completableFuture;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        CompletableFuture cancel = build.cancel(TESTING_TIMEOUT);
        Assertions.assertThat(cancel).isNotDone();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        completableFuture.complete(new TestingJobMasterGatewayBuilder().setCancelFunction(() -> {
            atomicBoolean.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build());
        cancel.get();
        Assertions.assertThat(atomicBoolean).isTrue();
    }

    @Test
    void testJobInformationOperationsDuringInitialization() throws Exception {
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setIsInitializedAndRunningSupplier(() -> {
            return false;
        }).build()).build();
        build.start();
        assertInitializingStates(build);
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertInitializingStates(build);
    }

    private static void assertInitializingStates(JobManagerRunner jobManagerRunner) throws ExecutionException, InterruptedException {
        Assertions.assertThat((Comparable) jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get()).isEqualTo(JobStatus.INITIALIZING);
        Assertions.assertThat(jobManagerRunner.getResultFuture()).isNotDone();
        Assertions.assertThat(((ExecutionGraphInfo) jobManagerRunner.requestJob(TESTING_TIMEOUT).get()).getArchivedExecutionGraph().getState()).isEqualTo(JobStatus.INITIALIZING);
        Assertions.assertThat(((JobDetails) jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get()).getStatus()).isEqualTo(JobStatus.INITIALIZING);
    }

    @Test
    void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withJobMasterServiceProcesses(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            return completableFuture;
        }).setIsInitializedAndRunningSupplier(() -> {
            return false;
        }).build(), TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            completableFuture2.complete(null);
            return completableFuture2;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        Assertions.assertThat((Comparable) build.requestJobStatus(TESTING_TIMEOUT).get()).isEqualTo(JobStatus.INITIALIZING);
        for (int i = 0; i < 10; i++) {
            this.leaderElectionService.notLeader();
            this.leaderElectionService.isLeader(UUID.randomUUID());
        }
        completableFuture.complete(null);
        build.closeAsync();
        Assertions.assertThat(completableFuture2).isDone();
    }

    @Test
    void testCancellationFailsWhenInitializationFails() throws Exception {
        FlinkException flinkException = new FlinkException("test exception");
        runCancellationFailsTest(completableFuture -> {
            completableFuture.complete(JobManagerRunnerResult.forInitializationFailure(createFailedExecutionGraphInfo(flinkException), flinkException));
        });
    }

    @Test
    void testCancellationFailsWhenExceptionOccurs() throws Exception {
        FlinkException flinkException = new FlinkException("test exception");
        runCancellationFailsTest(completableFuture -> {
            completableFuture.completeExceptionally(flinkException);
        });
    }

    void runCancellationFailsTest(Consumer<CompletableFuture<JobManagerRunnerResult>> consumer) throws Exception {
        CompletableFuture<JobManagerRunnerResult> completableFuture = new CompletableFuture<>();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(CompletableFuture::new).setGetResultFutureSupplier(() -> {
            return completableFuture;
        }).setIsInitializedAndRunningSupplier(() -> {
            return false;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        Assertions.assertThat((Comparable) build.requestJobStatus(TESTING_TIMEOUT).get()).isEqualTo(JobStatus.INITIALIZING);
        CompletableFuture cancel = build.cancel(TESTING_TIMEOUT);
        Assertions.assertThat(cancel).isNotDone();
        consumer.accept(completableFuture);
        cancel.getClass();
        Assertions.assertThatThrownBy(cancel::get).hasMessageContaining("Cancellation failed.");
    }

    @Test
    void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetResultFutureSupplier(() -> {
            return completableFuture;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID()).join();
        this.leaderElectionService.notLeader();
        completableFuture.complete(JobManagerRunnerResult.forSuccess(createFailedExecutionGraphInfo(new FlinkException("test exception"))));
        Assertions.assertThat(build.getResultFuture()).failsWithin(5L, TimeUnit.MILLISECONDS);
    }

    @Test
    void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception {
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(CompletableFuture::new).build()).build();
        build.start();
        CompletableFuture jobMasterGateway = build.getJobMasterGateway();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        Assertions.assertThat(jobMasterGateway).failsWithin(5L, TimeUnit.MILLISECONDS);
    }

    @Test
    void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetLeaderAddressFutureSupplier(() -> {
            return completableFuture;
        }).build()).build().start();
        CompletableFuture<UUID> isLeader = this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        completableFuture.complete("foobar");
        Assertions.assertThat(isLeader).failsWithin(5L, TimeUnit.MILLISECONDS);
    }

    @Test
    void testInitialJobStatusIsInitializing() throws Exception {
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().build();
        build.start();
        Assertions.assertThat((Comparable) build.requestJobStatus(TESTING_TIMEOUT).join()).isEqualTo(JobStatus.INITIALIZING);
    }

    @Test
    void testCancellationChangesJobStatusToCancelling() throws Exception {
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().build();
        build.start();
        build.cancel(TESTING_TIMEOUT);
        Assertions.assertThat((Comparable) build.requestJobStatus(TESTING_TIMEOUT).join()).isEqualTo(JobStatus.CANCELLING);
    }

    @Test
    void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            completableFuture.complete(null);
            return completableFuture;
        }).setIsInitializedAndRunningSupplier(() -> {
            return Boolean.valueOf(!completableFuture.isDone());
        }).build()).build();
        build.start();
        build.cancel(TESTING_TIMEOUT);
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        Assertions.assertThat((Comparable) build.requestJobStatus(TESTING_TIMEOUT).join()).isEqualTo(JobStatus.INITIALIZING);
    }

    @Test
    void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            return completableFuture;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        FlinkException flinkException = new FlinkException("Test exception");
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(build.getResultFuture()).failsWithin(5L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).satisfies(new ThrowingConsumer[]{th -> {
            Assertions.assertThat(th).hasRootCause(flinkException);
        }});
    }

    @Test
    void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throws Exception {
        FlinkRuntimeException flinkRuntimeException = new FlinkRuntimeException("Test exception");
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(uuid -> {
            throw flinkRuntimeException;
        }).build()).build();
        build.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        Assertions.assertThat(build.getResultFuture()).failsWithin(5L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).satisfies(new ThrowingConsumer[]{th -> {
            Assertions.assertThat(th).hasRootCause(flinkRuntimeException);
        }});
    }

    @Test
    void testJobAlreadyDone() throws Exception {
        JobID jobID = new JobID();
        this.jobResultStore.createDirtyResult(new JobResultEntry(TestingJobResultStore.createJobResult(jobID, ApplicationStatus.UNKNOWN)));
        JobMasterServiceLeadershipRunner build = newJobMasterServiceLeadershipRunnerBuilder().setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobId(jobID).build()).build();
        Throwable th = null;
        try {
            build.start();
            this.leaderElectionService.isLeader(UUID.randomUUID());
            Assertions.assertThat(((JobManagerRunnerResult) build.getResultFuture().get()).getExecutionGraphInfo().getArchivedExecutionGraph().getState()).isEqualTo(JobStatus.FAILED);
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip() throws Exception {
        TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
        DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        TestingJobMasterServiceProcess build = TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(CompletableFuture::new).setGetResultFutureSupplier(CompletableFuture::new).setGetLeaderAddressFutureSupplier(() -> {
            return CompletableFuture.completedFuture("unused address");
        }).setCloseAsyncSupplier(() -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(null);
        }).build();
        JobMasterServiceLeadershipRunner build2 = newJobMasterServiceLeadershipRunnerBuilder().setClassLoaderLease(TestingClassLoaderLease.newBuilder().setCloseRunnable(() -> {
            try {
                oneShotLatch2.await();
                Thread.sleep(5L);
            } catch (InterruptedException e) {
                ExceptionUtils.checkInterrupted(e);
            }
        }).build()).setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(uuid -> {
            return build;
        }).build()).setLeaderElectionService(defaultLeaderElectionService).build();
        Throwable th = null;
        try {
            try {
                build2.start();
                TestingLeaderElectionDriver testingLeaderElectionDriver = (TestingLeaderElectionDriver) Preconditions.checkNotNull(testingLeaderElectionDriverFactory.getCurrentLeaderDriver());
                testingLeaderElectionDriver.isLeader();
                while (true) {
                    if (testingLeaderElectionDriver.getLeaderInformation().getLeaderSessionID() != null && defaultLeaderElectionService.hasLeadership(testingLeaderElectionDriver.getLeaderInformation().getLeaderSessionID())) {
                        break;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                build2.getClass();
                CheckedThread createCheckedThread = createCheckedThread(build2::close);
                createCheckedThread.start();
                oneShotLatch.await();
                testingLeaderElectionDriver.getClass();
                CheckedThread createCheckedThread2 = createCheckedThread(testingLeaderElectionDriver::isLeader);
                createCheckedThread2.start();
                oneShotLatch2.trigger();
                createCheckedThread.sync();
                createCheckedThread2.sync();
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    private static CheckedThread createCheckedThread(final ThrowingRunnable<? extends Exception> throwingRunnable) {
        return new CheckedThread() { // from class: org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunnerTest.1
            public void go() throws Exception {
                throwingRunnable.run();
            }
        };
    }

    private void assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> completableFuture) throws ExecutionException, InterruptedException {
        Assertions.assertThat(completableFuture.get().getExecutionGraphInfo().getArchivedExecutionGraph().getState()).isEqualTo(JobStatus.SUSPENDED);
    }

    public JobMasterServiceLeadershipRunnerBuilder newJobMasterServiceLeadershipRunnerBuilder() {
        return new JobMasterServiceLeadershipRunnerBuilder();
    }
}
