/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherGatewayService;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.testutils.TestingExecutionPlanStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={TestLoggerExtension.class})
class SessionDispatcherLeaderProcessTest {
    private static final JobGraph JOB_GRAPH = JobGraphTestUtils.emptyJobGraph();
    private static ExecutorService ioExecutor;
    private final UUID leaderSessionId = UUID.randomUUID();
    private TestingFatalErrorHandler fatalErrorHandler;
    private ExecutionPlanStore executionPlanStore;
    private JobResultStore jobResultStore;
    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherServiceFactory;

    SessionDispatcherLeaderProcessTest() {
    }

    @BeforeAll
    static void setupClass() {
        ioExecutor = Executors.newSingleThreadExecutor();
    }

    @BeforeEach
    void setup() {
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().build();
        this.jobResultStore = TestingJobResultStore.builder().build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().build());
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
            this.fatalErrorHandler = null;
        }
    }

    @AfterAll
    static void teardownClass() {
        if (ioExecutor != null) {
            ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{ioExecutor});
        }
    }

    @Test
    void start_afterClose_doesNotHaveAnEffect() throws Exception {
        SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();
        dispatcherLeaderProcess.close();
        dispatcherLeaderProcess.start();
        Assertions.assertThat((Comparable)dispatcherLeaderProcess.getState()).isEqualTo((Object)AbstractDispatcherLeaderProcess.State.STOPPED);
    }

    @Test
    void testStartTriggeringDispatcherServiceCreation() throws Exception {
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().build());
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            Assertions.assertThat((Comparable)dispatcherLeaderProcess.getState()).isEqualTo((Object)AbstractDispatcherLeaderProcess.State.RUNNING);
        }
    }

    @Test
    void testRecoveryWithExecutionPlanButNoDirtyJobResult() throws Exception {
        this.testJobRecovery(Collections.singleton(JOB_GRAPH), Collections.emptySet(), actualRecoveredExecutionPlans -> ((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredExecutionPlans).singleElement()).isEqualTo((Object)JOB_GRAPH), actualRecoveredDirtyJobResults -> Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).isEmpty());
    }

    @Test
    void testRecoveryWithExecutionPlanAndMatchingDirtyJobResult() throws Exception {
        JobResult matchingDirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(JOB_GRAPH.getJobID());
        this.testJobRecovery(Collections.singleton(JOB_GRAPH), Collections.singleton(matchingDirtyJobResult), actualRecoveredExecutionPlans -> Assertions.assertThat((Collection)actualRecoveredExecutionPlans).isEmpty(), actualRecoveredDirtyJobResults -> ((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).singleElement()).isEqualTo((Object)matchingDirtyJobResult));
    }

    @Test
    void testRecoveryWithMultipleExecutionPlansAndOneMatchingDirtyJobResult() throws Exception {
        JobResult matchingDirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(JOB_GRAPH.getJobID());
        JobGraph otherExecutionPlan = JobGraphTestUtils.emptyJobGraph();
        this.testJobRecovery(Arrays.asList(otherExecutionPlan, JOB_GRAPH), Collections.singleton(matchingDirtyJobResult), arg_0 -> SessionDispatcherLeaderProcessTest.lambda$testRecoveryWithMultipleExecutionPlansAndOneMatchingDirtyJobResult$6((ExecutionPlan)otherExecutionPlan, arg_0), actualRecoveredDirtyJobResults -> ((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).singleElement()).isEqualTo((Object)matchingDirtyJobResult));
    }

    @Test
    void testRecoveryWithoutExecutionPlanButDirtyJobResult() throws Exception {
        JobResult dirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(new JobID());
        this.testJobRecovery(Collections.emptyList(), Collections.singleton(dirtyJobResult), actualRecoveredExecutionPlans -> Assertions.assertThat((Collection)actualRecoveredExecutionPlans).isEmpty(), actualRecoveredDirtyJobResults -> ((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).singleElement()).isEqualTo((Object)dirtyJobResult));
    }

    private void testJobRecovery(Collection<ExecutionPlan> executionPlansToRecover, Set<JobResult> dirtyJobResults, Consumer<Collection<ExecutionPlan>> recoveredExecutionPlanAssertion, Consumer<Collection<JobResult>> recoveredDirtyJobResultAssertion) throws Exception {
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setInitialExecutionPlans(executionPlansToRecover).build();
        this.jobResultStore = TestingJobResultStore.builder().withGetDirtyResultsSupplier((SupplierWithException<Set<JobResult>, ? extends IOException>)((SupplierWithException)() -> dirtyJobResults)).build();
        CompletableFuture recoveredExecutionPlansFuture = new CompletableFuture();
        CompletableFuture recoveredDirtyJobResultsFuture = new CompletableFuture();
        this.dispatcherServiceFactory = (ignoredDispatcherId, recoveredJobs, recoveredDirtyJobResults, ignoredExecutionPlanWriter, ignoredJobResultStore) -> {
            recoveredExecutionPlansFuture.complete(recoveredJobs);
            recoveredDirtyJobResultsFuture.complete(recoveredDirtyJobResults);
            return TestingDispatcherGatewayService.newBuilder().build();
        };
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            recoveredExecutionPlanAssertion.accept((Collection)recoveredExecutionPlansFuture.get());
            recoveredDirtyJobResultAssertion.accept((Collection)recoveredDirtyJobResultsFuture.get());
        }
    }

    @Test
    void testRecoveryWhileExecutionPlanRecoveryIsScheduledConcurrently() throws Exception {
        JobResult dirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(new JobID());
        OneShotLatch recoveryInitiatedLatch = new OneShotLatch();
        OneShotLatch jobGraphAddedLatch = new OneShotLatch();
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setRecoverExecutionPlanFunction((BiFunctionWithException<JobID, Map<JobID, ExecutionPlan>, ExecutionPlan, ? extends Exception>)((BiFunctionWithException)(jobId, jobs) -> null)).build();
        this.jobResultStore = TestingJobResultStore.builder().withGetDirtyResultsSupplier((SupplierWithException<Set<JobResult>, ? extends IOException>)((SupplierWithException)() -> {
            recoveryInitiatedLatch.trigger();
            try {
                jobGraphAddedLatch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return Collections.singleton(dirtyJobResult);
        })).build();
        CompletableFuture recoveredExecutionPlansFuture = new CompletableFuture();
        CompletableFuture recoveredDirtyJobResultsFuture = new CompletableFuture();
        this.dispatcherServiceFactory = (ignoredDispatcherId, recoveredJobs, recoveredDirtyJobResults, ignoredExecutionPlanWriter, ignoredJobResultStore) -> {
            recoveredExecutionPlansFuture.complete(recoveredJobs);
            recoveredDirtyJobResultsFuture.complete(recoveredDirtyJobResults);
            return TestingDispatcherGatewayService.newBuilder().build();
        };
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            recoveryInitiatedLatch.await();
            dispatcherLeaderProcess.onAddedExecutionPlan(dirtyJobResult.getJobId());
            jobGraphAddedLatch.trigger();
            FlinkAssertions.assertThatFuture(recoveredExecutionPlansFuture).eventuallySucceeds().satisfies(new ThrowingConsumer[]{recoverExecutionPlans -> Assertions.assertThat((Collection)recoverExecutionPlans).isEmpty()});
            FlinkAssertions.assertThatFuture(recoveredDirtyJobResultsFuture).eventuallySucceeds().satisfies(new ThrowingConsumer[]{recoveredDirtyJobResults -> Assertions.assertThat((Collection)recoveredDirtyJobResults).containsExactly((Object[])new JobResult[]{dirtyJobResult})});
        }
    }

    @Test
    void closeAsync_stopsExecutionPlanStoreAndDispatcher() throws Exception {
        CompletableFuture jobGraphStopFuture = new CompletableFuture();
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setStopRunnable((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> jobGraphStopFuture.complete(null))).build();
        CompletableFuture<Object> dispatcherServiceTerminationFuture = new CompletableFuture<Object>();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setTerminationFuture(dispatcherServiceTerminationFuture).withManualTerminationFutureCompletion().build());
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            CompletableFuture terminationFuture = dispatcherLeaderProcess.closeAsync();
            Assertions.assertThat(jobGraphStopFuture).isNotDone();
            Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
            dispatcherServiceTerminationFuture.complete(null);
            jobGraphStopFuture.get();
            terminationFuture.get();
        }
    }

    @Test
    void unexpectedDispatcherServiceTerminationWhileRunning_callsFatalErrorHandler() {
        CompletableFuture terminationFuture = new CompletableFuture();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setTerminationFuture(terminationFuture).build());
        SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();
        dispatcherLeaderProcess.start();
        FlinkException expectedFailure = new FlinkException("Expected test failure.");
        terminationFuture.completeExceptionally((Throwable)expectedFailure);
        Throwable error = this.fatalErrorHandler.getErrorFuture().join();
        Assertions.assertThat((Throwable)error).rootCause().isEqualTo((Object)expectedFailure);
        this.fatalErrorHandler.clearError();
    }

    @Test
    void unexpectedDispatcherServiceTerminationWhileNotRunning_doesNotCallFatalErrorHandler() {
        CompletableFuture terminationFuture = new CompletableFuture();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setTerminationFuture(terminationFuture).withManualTerminationFutureCompletion().build());
        SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();
        dispatcherLeaderProcess.start();
        dispatcherLeaderProcess.closeAsync();
        FlinkException expectedFailure = new FlinkException("Expected test failure.");
        terminationFuture.completeExceptionally((Throwable)expectedFailure);
        Assertions.assertThatThrownBy(() -> this.fatalErrorHandler.getErrorFuture().get(10L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
    }

    @Test
    void confirmLeaderSessionFuture_completesAfterDispatcherServiceHasBeenStarted() throws Exception {
        OneShotLatch createDispatcherServiceLatch = new OneShotLatch();
        String dispatcherAddress = "myAddress";
        TestingDispatcherGateway dispatcherGateway = ((TestingDispatcherGateway.Builder)TestingDispatcherGateway.newBuilder().setAddress("myAddress")).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> {
            try {
                createDispatcherServiceLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(dispatcherGateway).build();
        });
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            CompletableFuture confirmLeaderSessionFuture = dispatcherLeaderProcess.getLeaderAddressFuture();
            dispatcherLeaderProcess.start();
            Assertions.assertThat((CompletableFuture)confirmLeaderSessionFuture).isNotDone();
            createDispatcherServiceLatch.trigger();
            FlinkAssertions.assertThatFuture((CompletableFuture)confirmLeaderSessionFuture).eventuallySucceeds().isEqualTo((Object)"myAddress");
        }
    }

    @Test
    void closeAsync_duringJobRecovery_preventsDispatcherServiceCreation() throws Exception {
        OneShotLatch jobRecoveryStartedLatch = new OneShotLatch();
        OneShotLatch completeJobRecoveryLatch = new OneShotLatch();
        OneShotLatch createDispatcherServiceLatch = new OneShotLatch();
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setJobIdsFunction((FunctionWithException<Collection<JobID>, Collection<JobID>, ? extends Exception>)((FunctionWithException)storedJobs -> {
            jobRecoveryStartedLatch.trigger();
            completeJobRecoveryLatch.await();
            return storedJobs;
        })).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> {
            createDispatcherServiceLatch.trigger();
            return TestingDispatcherGatewayService.newBuilder().build();
        });
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            jobRecoveryStartedLatch.await();
            dispatcherLeaderProcess.closeAsync();
            completeJobRecoveryLatch.trigger();
            Assertions.assertThatThrownBy(() -> createDispatcherServiceLatch.await(10L, TimeUnit.MILLISECONDS), (String)"No dispatcher service should be created after the process has been stopped.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void onRemovedExecutionPlan_terminatesRunningJob() throws Exception {
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setInitialExecutionPlans(Collections.singleton(JOB_GRAPH)).build();
        CompletableFuture terminateJobFuture = new CompletableFuture();
        TestingDispatcherGatewayService testingDispatcherService = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> {
            terminateJobFuture.complete(jobID);
            return FutureUtils.completedVoidFuture();
        }).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> testingDispatcherService);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.executionPlanStore.globalCleanupAsync(JOB_GRAPH.getJobID(), (Executor)executorService).join();
            dispatcherLeaderProcess.onRemovedExecutionPlan(JOB_GRAPH.getJobID());
            Assertions.assertThat((Comparable)((JobID)terminateJobFuture.get())).isEqualTo((Object)JOB_GRAPH.getJobID());
        }
        finally {
            Assertions.assertThat(executorService.shutdownNow()).isEmpty();
        }
    }

    @Test
    void onRemovedExecutionPlan_failingRemovalCall_failsFatally() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        TestingDispatcherGatewayService testingDispatcherService = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> FutureUtils.completedExceptionally((Throwable)testException)).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> testingDispatcherService);
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            dispatcherLeaderProcess.onRemovedExecutionPlan(JOB_GRAPH.getJobID());
            Throwable fatalError = this.fatalErrorHandler.getErrorFuture().join();
            Assertions.assertThat((Throwable)fatalError).hasCause((Throwable)testException);
            this.fatalErrorHandler.clearError();
        }
    }

    @Test
    void onAddedExecutionPlan_submitsRecoveredJob() throws Exception {
        CompletableFuture submittedJobFuture = new CompletableFuture();
        TestingDispatcherGateway testingDispatcherGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(submittedJob -> {
            submittedJobFuture.complete(submittedJob);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(testingDispatcherGateway).build());
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.executionPlanStore.putExecutionPlan((ExecutionPlan)JOB_GRAPH);
            dispatcherLeaderProcess.onAddedExecutionPlan(JOB_GRAPH.getJobID());
            ExecutionPlan submittedExecutionPlan = (ExecutionPlan)submittedJobFuture.get();
            Assertions.assertThat((Comparable)submittedExecutionPlan.getJobID()).isEqualTo((Object)JOB_GRAPH.getJobID());
        }
    }

    @Test
    void onAddedExecutionPlan_ifNotRunning_isBeingIgnored() throws Exception {
        CompletableFuture recoveredJobFuture = new CompletableFuture();
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setRecoverExecutionPlanFunction((BiFunctionWithException<JobID, Map<JobID, ExecutionPlan>, ExecutionPlan, ? extends Exception>)((BiFunctionWithException)(jobId, jobGraphs) -> {
            recoveredJobFuture.complete(jobId);
            return (ExecutionPlan)jobGraphs.get(jobId);
        })).build();
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.executionPlanStore.putExecutionPlan((ExecutionPlan)JOB_GRAPH);
            dispatcherLeaderProcess.closeAsync();
            dispatcherLeaderProcess.onAddedExecutionPlan(JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> recoveredJobFuture.get(10L, TimeUnit.MILLISECONDS), (String)"onAddedExecutionPlan should be ignored if the leader process is not running.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
        }
    }

    @Test
    void onAddedExecutionPlan_failingRecovery_propagatesTheFailure() throws Exception {
        FlinkException expectedFailure = new FlinkException("Expected failure");
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setRecoverExecutionPlanFunction((BiFunctionWithException<JobID, Map<JobID, ExecutionPlan>, ExecutionPlan, ? extends Exception>)((BiFunctionWithException)(ignoredA, ignoredB) -> {
            throw expectedFailure;
        })).build();
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.executionPlanStore.putExecutionPlan((ExecutionPlan)JOB_GRAPH);
            dispatcherLeaderProcess.onAddedExecutionPlan(JOB_GRAPH.getJobID());
            ((ListAssert)FlinkAssertions.assertThatFuture(this.fatalErrorHandler.getErrorFuture()).eventuallySucceeds().extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).contains((Object[])new Throwable[]{expectedFailure});
            Assertions.assertThat((Comparable)dispatcherLeaderProcess.getState()).isEqualTo((Object)AbstractDispatcherLeaderProcess.State.STOPPED);
            this.fatalErrorHandler.clearError();
        }
    }

    @Test
    void recoverJobs_withRecoveryFailure_failsFatally() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setRecoverExecutionPlanFunction((BiFunctionWithException<JobID, Map<JobID, ExecutionPlan>, ExecutionPlan, ? extends Exception>)((BiFunctionWithException)(ignoredA, ignoredB) -> {
            throw testException;
        })).setInitialExecutionPlans(Collections.singleton(JOB_GRAPH)).build();
        this.runJobRecoveryFailureTest(testException);
    }

    @Test
    void recoverJobs_withJobIdRecoveryFailure_failsFatally() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setJobIdsFunction((FunctionWithException<Collection<JobID>, Collection<JobID>, ? extends Exception>)((FunctionWithException)ignored -> {
            throw testException;
        })).build();
        this.runJobRecoveryFailureTest(testException);
    }

    private void runJobRecoveryFailureTest(FlinkException testException) throws Exception {
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            FlinkAssertions.assertThatFuture(this.fatalErrorHandler.getErrorFuture()).eventuallySucceeds().satisfies(new ThrowingConsumer[]{error -> Assertions.assertThat((Throwable)error).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(testException.getClass(), (String)testException.getMessage())})});
            this.fatalErrorHandler.clearError();
        }
    }

    @Test
    void onAddedExecutionPlan_failingRecoveredJobSubmission_failsFatally() throws Exception {
        TestingDispatcherGateway dispatcherGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)new JobSubmissionException(jobGraph.getJobID(), "test exception"))).build();
        this.runOnAddedExecutionPlanTest(dispatcherGateway, (org.apache.flink.util.function.ThrowingConsumer<TestingFatalErrorHandler, Exception>)((org.apache.flink.util.function.ThrowingConsumer)this::verifyOnAddedExecutionPlanResultFailsFatally));
    }

    private void verifyOnAddedExecutionPlanResultFailsFatally(TestingFatalErrorHandler fatalErrorHandler) {
        Throwable actualCause = fatalErrorHandler.getErrorFuture().join();
        ((ListAssert)Assertions.assertThat((Throwable)actualCause).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).hasAtLeastOneElementOfType(JobSubmissionException.class);
        fatalErrorHandler.clearError();
    }

    @Test
    void onAddedExecutionPlan_duplicateJobSubmissionDueToFalsePositive_willBeIgnored() throws Exception {
        TestingDispatcherGateway dispatcherGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)DuplicateJobSubmissionException.of((JobID)jobGraph.getJobID()))).build();
        this.runOnAddedExecutionPlanTest(dispatcherGateway, (org.apache.flink.util.function.ThrowingConsumer<TestingFatalErrorHandler, Exception>)((org.apache.flink.util.function.ThrowingConsumer)this::verifyOnAddedExecutionPlanResultDidNotFail));
    }

    private void runOnAddedExecutionPlanTest(TestingDispatcherGateway dispatcherGateway, org.apache.flink.util.function.ThrowingConsumer<TestingFatalErrorHandler, Exception> verificationLogic) throws Exception {
        this.executionPlanStore = TestingExecutionPlanStore.newBuilder().setInitialExecutionPlans(Collections.singleton(JOB_GRAPH)).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnExecutionPlans(jobGraphs -> {
            Assertions.assertThat((Collection)jobGraphs).containsExactlyInAnyOrder((Object[])new ExecutionPlan[]{JOB_GRAPH});
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(dispatcherGateway).build();
        });
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            dispatcherLeaderProcess.onAddedExecutionPlan(JOB_GRAPH.getJobID());
            verificationLogic.accept((Object)this.fatalErrorHandler);
        }
    }

    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory createFactoryBasedOnExecutionPlans(Function<Collection<ExecutionPlan>, AbstractDispatcherLeaderProcess.DispatcherGatewayService> createFunction) {
        return (ignoredDispatcherId, recoveredJobs, ignoredRecoveredDirtyJobResults, ignoredExecutionPlanWriter, ignoredJobResultStore) -> (AbstractDispatcherLeaderProcess.DispatcherGatewayService)createFunction.apply(recoveredJobs);
    }

    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory createFactoryBasedOnGenericSupplier(Supplier<AbstractDispatcherLeaderProcess.DispatcherGatewayService> supplier) {
        return (ignoredDispatcherId, ignoredRecoveredJobs, ignoredRecoveredDirtyJobResults, ignoredExecutionPlanWriter, ignoredJobResultStore) -> (AbstractDispatcherLeaderProcess.DispatcherGatewayService)supplier.get();
    }

    private void verifyOnAddedExecutionPlanResultDidNotFail(TestingFatalErrorHandler fatalErrorHandler) {
        Assertions.assertThatThrownBy(() -> fatalErrorHandler.getErrorFuture().get(10L, TimeUnit.MILLISECONDS), (String)"Expected that duplicate job submissions due to false job recoveries are ignored.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
    }

    private SessionDispatcherLeaderProcess createDispatcherLeaderProcess() {
        return SessionDispatcherLeaderProcess.create((UUID)this.leaderSessionId, (AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory)this.dispatcherServiceFactory, (ExecutionPlanStore)this.executionPlanStore, (JobResultStore)this.jobResultStore, (Executor)ioExecutor, (FatalErrorHandler)this.fatalErrorHandler);
    }

    private static /* synthetic */ void lambda$testRecoveryWithMultipleExecutionPlansAndOneMatchingDirtyJobResult$6(ExecutionPlan otherExecutionPlan, Collection actualRecoveredExecutionPlans) {
        ((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredExecutionPlans).singleElement()).isEqualTo((Object)otherExecutionPlan);
    }
}

