package org.apache.flink.runtime.leaderelection;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.leaderelection.TestingGenericLeaderContender;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.class */
class DefaultLeaderElectionServiceTest {
    private static final String TEST_URL = "akka//user/jobmanager";

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest$Context.class */
    private static class Context {
        DefaultLeaderElectionService leaderElectionService;
        TestingContender testingContender;
        TestingLeaderElectionDriver testingLeaderElectionDriver;

        private Context() {
        }

        void runTestWithSynchronousEventHandling(RunnableWithException runnableWithException) throws Exception {
            runTest(runnableWithException, Executors.newDirectExecutorService());
        }

        void runTestWithManuallyTriggeredEvents(ThrowingConsumer<ManuallyTriggeredScheduledExecutorService, Exception> throwingConsumer) throws Exception {
            ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
            runTest(() -> {
                throwingConsumer.accept(manuallyTriggeredScheduledExecutorService);
            }, manuallyTriggeredScheduledExecutorService);
        }

        void runTest(RunnableWithException runnableWithException, ExecutorService executorService) throws Exception {
            TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
            this.leaderElectionService = new DefaultLeaderElectionService(testingLeaderElectionDriverFactory, executorService);
            this.testingContender = new TestingContender(DefaultLeaderElectionServiceTest.TEST_URL, this.leaderElectionService);
            this.leaderElectionService.start(this.testingContender);
            this.testingLeaderElectionDriver = testingLeaderElectionDriverFactory.getCurrentLeaderDriver();
            Assertions.assertThat(this.testingLeaderElectionDriver).isNotNull();
            runnableWithException.run();
            this.leaderElectionService.stop();
        }
    }

    DefaultLeaderElectionServiceTest() {
    }

    @Test
    void testOnGrantAndRevokeLeadership() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.1
            {
                runTestWithSynchronousEventHandling(() -> {
                    this.testingLeaderElectionDriver.isLeader();
                    this.testingContender.waitForLeader();
                    Assertions.assertThat(this.testingContender.getDescription()).isEqualTo(DefaultLeaderElectionServiceTest.TEST_URL);
                    Assertions.assertThat(this.testingContender.getLeaderSessionID()).isEqualTo(this.leaderElectionService.getLeaderSessionID());
                    LeaderInformation known = LeaderInformation.known(this.leaderElectionService.getLeaderSessionID(), DefaultLeaderElectionServiceTest.TEST_URL);
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation()).as("The HA backend should have its leader information updated.", new Object[0]).isEqualTo(known);
                    this.testingLeaderElectionDriver.notLeader();
                    this.testingContender.waitForRevokeLeader();
                    Assertions.assertThat(this.testingContender.getLeaderSessionID()).isNull();
                    Assertions.assertThat(this.leaderElectionService.getLeaderSessionID()).isNull();
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation()).as("External storage is not touched by the leader session because the leadership is already lost.", new Object[0]).isEqualTo(known);
                });
            }
        };
    }

    @Test
    void testProperCleanupOnStopWhenHoldingTheLeadership() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.2
            {
                runTestWithSynchronousEventHandling(() -> {
                    this.testingLeaderElectionDriver.isLeader();
                    this.testingContender.waitForLeader();
                    Assertions.assertThat(this.testingContender.getLeaderSessionID()).isNotNull();
                    Assertions.assertThat(this.leaderElectionService.getLeaderSessionID()).isNotNull();
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation().isEmpty()).isFalse();
                    this.leaderElectionService.stop();
                    Assertions.assertThat(this.testingContender.getLeaderSessionID()).as("The LeaderContender should have been informed about the leadership loss.", new Object[0]).isNull();
                    Assertions.assertThat(this.leaderElectionService.getLeaderSessionID()).as("The LeaderElectionService should have its internal state cleaned.", new Object[0]).isNull();
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation()).as("The HA backend's data should have been cleaned.", new Object[0]).isEqualTo(LeaderInformation.empty());
                });
            }
        };
    }

    @Test
    void testLeaderInformationChangedAndShouldBeCorrected() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.3
            {
                runTestWithSynchronousEventHandling(() -> {
                    this.testingLeaderElectionDriver.isLeader();
                    LeaderInformation known = LeaderInformation.known(this.leaderElectionService.getLeaderSessionID(), DefaultLeaderElectionServiceTest.TEST_URL);
                    this.testingLeaderElectionDriver.leaderInformationChanged(LeaderInformation.empty());
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation()).isEqualTo(known);
                    this.testingLeaderElectionDriver.leaderInformationChanged(LeaderInformation.known(UUID.randomUUID(), "faulty-address"));
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation()).isEqualTo(known);
                });
            }
        };
    }

    @Test
    void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.4
            {
                runTestWithManuallyTriggeredEvents(manuallyTriggeredScheduledExecutorService -> {
                    UUID randomUUID = UUID.randomUUID();
                    this.testingLeaderElectionDriver.isLeader(randomUUID);
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(randomUUID)).isFalse();
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(UUID.randomUUID())).isFalse();
                });
            }
        };
    }

    @Test
    void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.5
            {
                runTestWithManuallyTriggeredEvents(manuallyTriggeredScheduledExecutorService -> {
                    UUID randomUUID = UUID.randomUUID();
                    this.testingLeaderElectionDriver.isLeader(randomUUID);
                    manuallyTriggeredScheduledExecutorService.trigger();
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(randomUUID)).isTrue();
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(UUID.randomUUID())).isFalse();
                });
            }
        };
    }

    @Test
    void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.6
            {
                runTestWithManuallyTriggeredEvents(manuallyTriggeredScheduledExecutorService -> {
                    UUID randomUUID = UUID.randomUUID();
                    this.testingLeaderElectionDriver.isLeader(randomUUID);
                    manuallyTriggeredScheduledExecutorService.trigger();
                    this.testingLeaderElectionDriver.notLeader();
                    ((AbstractBooleanAssert) Assertions.assertThat(this.leaderElectionService.hasLeadership(randomUUID)).as("No operation should be handled anymore after the HA backend indicated leadership loss even if the onRevokeLeadership wasn't processed, yet, because some other process could have picked up the leadership in the meantime already based on the HA backend's decision.", new Object[0])).isFalse();
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(UUID.randomUUID())).isFalse();
                });
            }
        };
    }

    @Test
    void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.7
            {
                runTestWithManuallyTriggeredEvents(manuallyTriggeredScheduledExecutorService -> {
                    UUID randomUUID = UUID.randomUUID();
                    this.testingLeaderElectionDriver.isLeader(randomUUID);
                    manuallyTriggeredScheduledExecutorService.trigger();
                    this.testingLeaderElectionDriver.notLeader();
                    manuallyTriggeredScheduledExecutorService.trigger();
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(randomUUID)).isFalse();
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(UUID.randomUUID())).isFalse();
                });
            }
        };
    }

    @Test
    void testHasLeadershipAfterStop() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.8
            {
                runTestWithManuallyTriggeredEvents(manuallyTriggeredScheduledExecutorService -> {
                    UUID randomUUID = UUID.randomUUID();
                    this.testingLeaderElectionDriver.isLeader(randomUUID);
                    manuallyTriggeredScheduledExecutorService.trigger();
                    this.leaderElectionService.stop();
                    Assertions.assertThat(this.leaderElectionService.hasLeadership(randomUUID)).isFalse();
                });
            }
        };
    }

    @Test
    void testLeaderInformationChangedIfNotBeingLeader() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.9
            {
                runTestWithSynchronousEventHandling(() -> {
                    LeaderInformation known = LeaderInformation.known(UUID.randomUUID(), "faulty-address");
                    this.testingLeaderElectionDriver.leaderInformationChanged(known);
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation()).isEqualTo(known);
                });
            }
        };
    }

    @Test
    void testOnGrantLeadershipIsIgnoredAfterBeingStop() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.10
            {
                runTestWithSynchronousEventHandling(() -> {
                    this.leaderElectionService.stop();
                    this.testingLeaderElectionDriver.isLeader();
                    Assertions.assertThat(this.leaderElectionService.getLeaderSessionID()).as("The grant event shouldn't have been processed by the LeaderElectionService.", new Object[0]).isNull();
                    Assertions.assertThat(this.testingContender.getLeaderSessionID()).isNull();
                });
            }
        };
    }

    @Test
    void testOnLeaderInformationChangeIsIgnoredAfterBeingStop() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.11
            {
                runTestWithSynchronousEventHandling(() -> {
                    this.testingLeaderElectionDriver.isLeader();
                    this.leaderElectionService.stop();
                    this.testingLeaderElectionDriver.leaderInformationChanged(LeaderInformation.empty());
                    Assertions.assertThat(this.testingLeaderElectionDriver.getLeaderInformation()).isEqualTo(LeaderInformation.empty());
                });
            }
        };
    }

    @Test
    void testOnRevokeLeadershipIsTriggeredAfterBeingStop() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.12
            {
                runTestWithSynchronousEventHandling(() -> {
                    this.testingLeaderElectionDriver.isLeader();
                    Assertions.assertThat(this.testingContender.getLeaderSessionID()).isEqualTo(this.leaderElectionService.getLeaderSessionID());
                    this.leaderElectionService.stop();
                    Assertions.assertThat(this.testingContender.getLeaderSessionID()).as("LeaderContender should have been revoked as part of the stop call.", new Object[0]).isNull();
                });
            }
        };
    }

    @Test
    void testOldConfirmLeaderInformation() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.13
            {
                runTestWithSynchronousEventHandling(() -> {
                    this.testingLeaderElectionDriver.isLeader();
                    UUID leaderSessionID = this.leaderElectionService.getLeaderSessionID();
                    Assertions.assertThat(leaderSessionID).isNotNull();
                    this.leaderElectionService.confirmLeadership(UUID.randomUUID(), DefaultLeaderElectionServiceTest.TEST_URL);
                    Assertions.assertThat(this.leaderElectionService.getLeaderSessionID()).isEqualTo(leaderSessionID);
                });
            }
        };
    }

    @Test
    void testErrorForwarding() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.14
            {
                runTestWithSynchronousEventHandling(() -> {
                    Exception exc = new Exception("test leader exception");
                    this.testingLeaderElectionDriver.onFatalError(exc);
                    this.testingContender.waitForError();
                    Assertions.assertThat(this.testingContender.getError()).isNotNull().hasCause(exc);
                });
            }
        };
    }

    @Test
    void testErrorIsIgnoredAfterBeingStop() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.DefaultLeaderElectionServiceTest.15
            {
                runTestWithSynchronousEventHandling(() -> {
                    Exception exc = new Exception("test leader exception");
                    this.leaderElectionService.stop();
                    this.testingLeaderElectionDriver.onFatalError(exc);
                    Assertions.assertThat(this.testingContender.getError()).isNull();
                });
            }
        };
    }

    @Test
    void testServiceShutDownWithSynchronizedDriver() throws Exception {
        TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
        DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
        defaultLeaderElectionService.start(new TestingContender(TEST_URL, defaultLeaderElectionService));
        ((TestingLeaderElectionDriver) Preconditions.checkNotNull(testingLeaderElectionDriverFactory.getCurrentLeaderDriver())).isLeader();
        defaultLeaderElectionService.stop();
    }

    @Test
    void testOnLeadershipChangeDoesNotBlock() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingGenericLeaderElectionDriver build = TestingGenericLeaderElectionDriver.newBuilder().setWriteLeaderInformationConsumer(leaderInformation -> {
            if (completableFuture.isDone()) {
                oneShotLatch.awaitQuietly();
            } else {
                completableFuture.complete(leaderInformation);
            }
        }).setHasLeadershipSupplier(() -> {
            return true;
        }).build();
        DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService((leaderElectionEventHandler, fatalErrorHandler) -> {
            return build;
        });
        defaultLeaderElectionService.start(TestingGenericLeaderContender.newBuilder().setGrantLeadershipConsumer(uuid -> {
            defaultLeaderElectionService.confirmLeadership(uuid, "leader-address");
        }).build());
        UUID randomUUID = UUID.randomUUID();
        defaultLeaderElectionService.onGrantLeadership(randomUUID);
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().as("The LeaderInformation should have been forwarded to the driver.", new Object[0]).isEqualTo(LeaderInformation.known(randomUUID, "leader-address"));
        defaultLeaderElectionService.onLeaderInformationChange(LeaderInformation.empty());
        oneShotLatch.trigger();
        defaultLeaderElectionService.stop();
    }

    @Test
    void testOnGrantLeadershipAsyncDoesNotBlock() throws Exception {
        testNonBlockingCall(oneShotLatch -> {
            return TestingGenericLeaderContender.newBuilder().setGrantLeadershipConsumer(uuid -> {
                oneShotLatch.awaitQuietly();
            }).build();
        }, (v0) -> {
            v0.isLeader();
        });
    }

    @Test
    void testOnRevokeLeadershipDoesNotBlock() throws Exception {
        testNonBlockingCall(oneShotLatch -> {
            TestingGenericLeaderContender.Builder newBuilder = TestingGenericLeaderContender.newBuilder();
            oneShotLatch.getClass();
            return newBuilder.setRevokeLeadershipRunnable(oneShotLatch::awaitQuietly).build();
        }, testingLeaderElectionDriver -> {
            testingLeaderElectionDriver.isLeader();
            testingLeaderElectionDriver.notLeader();
        });
    }

    private static void testNonBlockingCall(Function<OneShotLatch, TestingGenericLeaderContender> function, Consumer<TestingLeaderElectionDriver> consumer) throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingGenericLeaderContender apply = function.apply(oneShotLatch);
        TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
        DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
        defaultLeaderElectionService.start(apply);
        TestingLeaderElectionDriver currentLeaderDriver = testingLeaderElectionDriverFactory.getCurrentLeaderDriver();
        Assertions.assertThat(currentLeaderDriver).isNotNull();
        consumer.accept(currentLeaderDriver);
        oneShotLatch.trigger();
        defaultLeaderElectionService.stop();
    }
}
