package org.apache.flink.runtime.dispatcher.runner;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.TriFunctionWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.class */
public class SessionDispatcherLeaderProcessTest extends TestLogger {
    private static final JobGraph JOB_GRAPH = new JobGraph("JobGraph");
    private static ExecutorService ioExecutor;
    private final UUID leaderSessionId = UUID.randomUUID();
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobGraphStore jobGraphStore;
    private TestingDispatcherServiceFactory dispatcherServiceFactory;

    @BeforeClass
    public static void setupClass() {
        ioExecutor = Executors.newSingleThreadExecutor();
    }

    @Before
    public void setup() {
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().build();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build();
    }

    @After
    public void teardown() throws Exception {
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
            this.fatalErrorHandler = null;
        }
    }

    @AfterClass
    public static void teardownClass() {
        if (ioExecutor != null) {
            ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{ioExecutor});
        }
    }

    @Test
    public void start_afterClose_doesNotHaveAnEffect() throws Exception {
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        createDispatcherLeaderProcess.close();
        createDispatcherLeaderProcess.start();
        MatcherAssert.assertThat(createDispatcherLeaderProcess.getState(), CoreMatchers.is(AbstractDispatcherLeaderProcess.State.STOPPED));
    }

    @Test
    public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            completableFuture.complete(collection);
            return TestingDispatcherGatewayService.newBuilder().build();
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                MatcherAssert.assertThat(createDispatcherLeaderProcess.getState(), CoreMatchers.is(AbstractDispatcherLeaderProcess.State.RUNNING));
                MatcherAssert.assertThat((Collection) completableFuture.get(), Matchers.containsInAnyOrder(new JobGraph[]{JOB_GRAPH}));
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void closeAsync_stopsJobGraphStoreAndDispatcher() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setStopRunnable(() -> {
            completableFuture.complete(null);
        }).build();
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            return TestingDispatcherGatewayService.newBuilder().setTerminationFuture(completableFuture2).withManualTerminationFutureCompletion().build();
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            createDispatcherLeaderProcess.start();
            createDispatcherLeaderProcess.getDispatcherGateway().get();
            CompletableFuture closeAsync = createDispatcherLeaderProcess.closeAsync();
            MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), CoreMatchers.is(false));
            MatcherAssert.assertThat(Boolean.valueOf(closeAsync.isDone()), CoreMatchers.is(false));
            completableFuture2.complete(null);
            completableFuture.get();
            closeAsync.get();
            if (createDispatcherLeaderProcess != null) {
                if (0 == 0) {
                    createDispatcherLeaderProcess.close();
                    return;
                }
                try {
                    createDispatcherLeaderProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDispatcherLeaderProcess != null) {
                if (0 != 0) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void unexpectedDispatcherServiceTerminationWhileRunning_callsFatalErrorHandler() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            return TestingDispatcherGatewayService.newBuilder().setTerminationFuture(completableFuture).build();
        }).build();
        createDispatcherLeaderProcess().start();
        FlinkException flinkException = new FlinkException("Expected test failure.");
        completableFuture.completeExceptionally(flinkException);
        MatcherAssert.assertThat(this.fatalErrorHandler.getErrorFuture().join(), org.apache.flink.util.CoreMatchers.containsCause(flinkException));
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void unexpectedDispatcherServiceTerminationWhileNotRunning_doesNotCallFatalErrorHandler() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            return TestingDispatcherGatewayService.newBuilder().setTerminationFuture(completableFuture).withManualTerminationFutureCompletion().build();
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        createDispatcherLeaderProcess.start();
        createDispatcherLeaderProcess.closeAsync();
        completableFuture.completeExceptionally(new FlinkException("Expected test failure."));
        MatcherAssert.assertThat(this.fatalErrorHandler.getErrorFuture(), FlinkMatchers.willNotComplete(Duration.ofMillis(10L)));
    }

    @Test
    public void confirmLeaderSessionFuture_completesAfterDispatcherServiceHasBeenStarted() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingDispatcherGateway build = new TestingDispatcherGateway.Builder().setAddress("myAddress").build();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction(TriFunctionWithException.unchecked((dispatcherId, collection, jobGraphWriter) -> {
            oneShotLatch.await();
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(build).build();
        })).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                CompletableFuture leaderAddressFuture = createDispatcherLeaderProcess.getLeaderAddressFuture();
                createDispatcherLeaderProcess.start();
                MatcherAssert.assertThat(Boolean.valueOf(leaderAddressFuture.isDone()), CoreMatchers.is(false));
                oneShotLatch.trigger();
                MatcherAssert.assertThat(leaderAddressFuture.get(), CoreMatchers.is("myAddress"));
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void closeAsync_duringJobRecovery_preventsDispatcherServiceCreation() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        OneShotLatch oneShotLatch3 = new OneShotLatch();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setJobIdsFunction(collection -> {
            oneShotLatch.trigger();
            oneShotLatch2.await();
            return collection;
        }).build();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection2, jobGraphWriter) -> {
            oneShotLatch3.trigger();
            return TestingDispatcherGatewayService.newBuilder().build();
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                oneShotLatch.await();
                createDispatcherLeaderProcess.closeAsync();
                oneShotLatch2.trigger();
                try {
                    oneShotLatch3.await(10L, TimeUnit.MILLISECONDS);
                    Assert.fail("No dispatcher service should be created after the process has been stopped.");
                } catch (TimeoutException e) {
                }
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void onRemovedJobGraph_terminatesRunningJob() throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGatewayService build = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> {
            completableFuture.complete(jobID);
            return FutureUtils.completedVoidFuture();
        }).build();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            return build;
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                this.jobGraphStore.removeJobGraph(JOB_GRAPH.getJobID());
                createDispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID());
                MatcherAssert.assertThat(completableFuture.get(), CoreMatchers.is(JOB_GRAPH.getJobID()));
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void onRemovedJobGraph_failingRemovalCall_failsFatally() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        TestingDispatcherGatewayService build = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> {
            return FutureUtils.completedExceptionally(flinkException);
        }).build();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            return build;
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                createDispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID());
                Assert.assertTrue(ExceptionUtils.findThrowable(this.fatalErrorHandler.getErrorFuture().join(), th2 -> {
                    return th2.equals(flinkException);
                }).isPresent());
                this.fatalErrorHandler.clearError();
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th5;
        }
    }

    @Test
    public void onAddedJobGraph_submitsRecoveredJob() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcherServiceFactory = createDispatcherServiceFactoryFor(new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            completableFuture.complete(jobGraph);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build());
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                this.jobGraphStore.putJobGraph(JOB_GRAPH);
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                MatcherAssert.assertThat(((JobGraph) completableFuture.get()).getJobID(), CoreMatchers.is(JOB_GRAPH.getJobID()));
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void onAddedJobGraph_ifNotRunning_isBeingIgnored() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((jobID, map) -> {
            completableFuture.complete(jobID);
            return (JobGraph) map.get(jobID);
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                this.jobGraphStore.putJobGraph(JOB_GRAPH);
                createDispatcherLeaderProcess.closeAsync();
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                try {
                    completableFuture.get(10L, TimeUnit.MILLISECONDS);
                    Assert.fail("onAddedJobGraph should be ignored if the leader process is not running.");
                } catch (TimeoutException e) {
                }
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void onAddedJobGraph_failingRecovery_propagatesTheFailure() throws Exception {
        FlinkException flinkException = new FlinkException("Expected failure");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((jobID, map) -> {
            throw flinkException;
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                this.jobGraphStore.putJobGraph(JOB_GRAPH);
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                Throwable th2 = this.fatalErrorHandler.getErrorFuture().get();
                flinkException.getClass();
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(th2, (v1) -> {
                    return r1.equals(v1);
                }).isPresent()), CoreMatchers.is(true));
                MatcherAssert.assertThat(createDispatcherLeaderProcess.getState(), CoreMatchers.is(AbstractDispatcherLeaderProcess.State.STOPPED));
                this.fatalErrorHandler.clearError();
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th5;
        }
    }

    @Test
    public void recoverJobs_withRecoveryFailure_failsFatally() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((jobID, map) -> {
            throw flinkException;
        }).setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        runJobRecoveryFailureTest(flinkException);
    }

    @Test
    public void recoverJobs_withJobIdRecoveryFailure_failsFatally() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setJobIdsFunction(collection -> {
            throw flinkException;
        }).build();
        runJobRecoveryFailureTest(flinkException);
    }

    private void runJobRecoveryFailureTest(FlinkException flinkException) throws Exception {
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(this.fatalErrorHandler.getErrorFuture().get(), flinkException.getMessage()).isPresent()), CoreMatchers.is(true));
                this.fatalErrorHandler.clearError();
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void onAddedJobGraph_failingRecoveredJobSubmission_failsFatally() throws Exception {
        runOnAddedJobGraphTest(new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(new JobSubmissionException(jobGraph.getJobID(), "test exception"));
        }).build(), this::verifyOnAddedJobGraphResultFailsFatally);
    }

    private void verifyOnAddedJobGraphResultFailsFatally(TestingFatalErrorHandler testingFatalErrorHandler) {
        Assert.assertTrue(ExceptionUtils.findThrowable(testingFatalErrorHandler.getErrorFuture().join(), JobSubmissionException.class).isPresent());
        testingFatalErrorHandler.clearError();
    }

    @Test
    public void onAddedJobGraph_duplicateJobSubmissionDueToFalsePositive_willBeIgnored() throws Exception {
        runOnAddedJobGraphTest(new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(new DuplicateJobSubmissionException(jobGraph.getJobID()));
        }).build(), this::verifyOnAddedJobGraphResultDidNotFail);
    }

    private void runOnAddedJobGraphTest(TestingDispatcherGateway testingDispatcherGateway, ThrowingConsumer<TestingFatalErrorHandler, Exception> throwingConsumer) throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            MatcherAssert.assertThat(collection, Matchers.containsInAnyOrder(new JobGraph[]{JOB_GRAPH}));
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(testingDispatcherGateway).build();
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                throwingConsumer.accept(this.fatalErrorHandler);
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    private void verifyOnAddedJobGraphResultDidNotFail(TestingFatalErrorHandler testingFatalErrorHandler) throws Exception {
        try {
            testingFatalErrorHandler.getErrorFuture().get(10L, TimeUnit.MILLISECONDS);
            Assert.fail("Expected that duplicate job submissions due to false job recoveries are ignored.");
        } catch (TimeoutException e) {
        }
    }

    private TestingDispatcherServiceFactory createDispatcherServiceFactoryFor(TestingDispatcherGateway testingDispatcherGateway) {
        return TestingDispatcherServiceFactory.newBuilder().setCreateFunction((dispatcherId, collection, jobGraphWriter) -> {
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(testingDispatcherGateway).build();
        }).build();
    }

    private SessionDispatcherLeaderProcess createDispatcherLeaderProcess() {
        return SessionDispatcherLeaderProcess.create(this.leaderSessionId, this.dispatcherServiceFactory, this.jobGraphStore, ioExecutor, this.fatalErrorHandler);
    }
}
