package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver;
import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriverFactory;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.ACLProvider;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.CreateBuilder;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.curator4.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.ACL;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.class */
public class ZooKeeperLeaderElectionTest extends TestLogger {
    private TestingServer testingServer;
    private Configuration configuration;
    private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;
    private static final long timeout = 200000;

    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private static final String TEST_URL = "akka//user/jobmanager";
    private static final LeaderInformation TEST_LEADER = LeaderInformation.known(UUID.randomUUID(), TEST_URL);
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest$DeletedCacheListener.class */
    private static class DeletedCacheListener implements NodeCacheListener {
        final CompletableFuture<Boolean> deletedPromise = new CompletableFuture<>();
        final NodeCache cache;

        public DeletedCacheListener(NodeCache nodeCache) {
            this.cache = nodeCache;
        }

        public Future<Boolean> nodeDeleted() {
            return this.deletedPromise;
        }

        public void nodeChanged() throws Exception {
            if (this.cache.getCurrentData() != null || this.deletedPromise.isDone()) {
                return;
            }
            this.deletedPromise.complete(true);
            this.cache.getListenable().removeListener(this);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest$ExistsCacheListener.class */
    private static class ExistsCacheListener implements NodeCacheListener {
        final CompletableFuture<Boolean> existsPromise = new CompletableFuture<>();
        final NodeCache cache;

        public ExistsCacheListener(NodeCache nodeCache) {
            this.cache = nodeCache;
        }

        public Future<Boolean> nodeExists() {
            return this.existsPromise;
        }

        public void nodeChanged() throws Exception {
            if (this.cache.getCurrentData() == null || this.existsPromise.isDone()) {
                return;
            }
            this.existsPromise.complete(true);
            this.cache.getListenable().removeListener(this);
        }
    }

    @Before
    public void before() {
        try {
            this.testingServer = new TestingServer();
            this.configuration = new Configuration();
            this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
            this.configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
            this.curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework(this.configuration, this.testingFatalErrorHandlerResource.getFatalErrorHandler());
        } catch (Exception e) {
            throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
        }
    }

    @After
    public void after() throws IOException {
        if (this.curatorFrameworkWrapper != null) {
            this.curatorFrameworkWrapper.close();
            this.curatorFrameworkWrapper = null;
        }
        if (this.testingServer != null) {
            this.testingServer.stop();
            this.testingServer = null;
        }
    }

    @Test
    public void testZooKeeperLeaderElectionRetrieval() throws Exception {
        TestingLeaderElectionEventHandler testingLeaderElectionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler testingLeaderRetrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        LeaderElectionDriver leaderElectionDriver = null;
        LeaderRetrievalDriver leaderRetrievalDriver = null;
        try {
            leaderElectionDriver = createAndInitLeaderElectionDriver(this.curatorFrameworkWrapper.asCuratorFramework(), testingLeaderElectionEventHandler);
            ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory = ZooKeeperUtils.createLeaderRetrievalDriverFactory(this.curatorFrameworkWrapper.asCuratorFramework());
            testingLeaderRetrievalEventHandler.getClass();
            leaderRetrievalDriver = createLeaderRetrievalDriverFactory.createLeaderRetrievalDriver(testingLeaderRetrievalEventHandler, testingLeaderRetrievalEventHandler::handleError);
            testingLeaderElectionEventHandler.waitForLeader(timeout);
            Assert.assertThat(testingLeaderElectionEventHandler.getConfirmedLeaderInformation(), Matchers.is(TEST_LEADER));
            testingLeaderRetrievalEventHandler.waitForNewLeader(timeout);
            Assert.assertThat(testingLeaderRetrievalEventHandler.getLeaderSessionID(), Matchers.is(TEST_LEADER.getLeaderSessionID()));
            Assert.assertThat(testingLeaderRetrievalEventHandler.getAddress(), Matchers.is(TEST_LEADER.getLeaderAddress()));
            testingLeaderElectionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
        } catch (Throwable th) {
            testingLeaderElectionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            throw th;
        }
    }

    @Test
    public void testZooKeeperReelection() throws Exception {
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(5L));
        DefaultLeaderElectionService[] defaultLeaderElectionServiceArr = new DefaultLeaderElectionService[10];
        TestingContender[] testingContenderArr = new TestingContender[10];
        DefaultLeaderRetrievalService defaultLeaderRetrievalService = null;
        TestingListener testingListener = new TestingListener();
        try {
            defaultLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(this.curatorFrameworkWrapper.asCuratorFramework());
            LOG.debug("Start leader retrieval service for the TestingListener.");
            defaultLeaderRetrievalService.start(testingListener);
            for (int i = 0; i < 10; i++) {
                defaultLeaderElectionServiceArr[i] = ZooKeeperUtils.createLeaderElectionService(this.curatorFrameworkWrapper.asCuratorFramework());
                testingContenderArr[i] = new TestingContender(createAddress(i), defaultLeaderElectionServiceArr[i]);
                LOG.debug("Start leader election service for contender #{}.", Integer.valueOf(i));
                defaultLeaderElectionServiceArr[i].start(testingContenderArr[i]);
            }
            Pattern compile = Pattern.compile("akka//user/jobmanager_(\\d+)");
            int i2 = 0;
            while (fromNow.hasTimeLeft() && i2 < 10) {
                LOG.debug("Wait for new leader #{}.", Integer.valueOf(i2));
                String waitForNewLeader = testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                Matcher matcher = compile.matcher(waitForNewLeader);
                if (matcher.find()) {
                    int parseInt = Integer.parseInt(matcher.group(1));
                    TestingContender testingContender = testingContenderArr[parseInt];
                    if (waitForNewLeader.equals(createAddress(parseInt)) && testingListener.getLeaderSessionID().equals(testingContender.getLeaderSessionID())) {
                        LOG.debug("Stop leader election service of contender #{}.", Integer.valueOf(i2));
                        defaultLeaderElectionServiceArr[parseInt].stop();
                        defaultLeaderElectionServiceArr[parseInt] = null;
                        i2++;
                    }
                } else {
                    Assert.fail("Did not find the leader's index.");
                }
            }
            Assert.assertFalse("Did not complete the leader reelection in time.", fromNow.isOverdue());
            Assert.assertEquals(10, i2);
            if (defaultLeaderRetrievalService != null) {
                defaultLeaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService defaultLeaderElectionService : defaultLeaderElectionServiceArr) {
                if (defaultLeaderElectionService != null) {
                    defaultLeaderElectionService.stop();
                }
            }
        } catch (Throwable th) {
            if (defaultLeaderRetrievalService != null) {
                defaultLeaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService defaultLeaderElectionService2 : defaultLeaderElectionServiceArr) {
                if (defaultLeaderElectionService2 != null) {
                    defaultLeaderElectionService2.stop();
                }
            }
            throw th;
        }
    }

    @Nonnull
    private String createAddress(int i) {
        return "akka//user/jobmanager_" + i;
    }

    @Test
    public void testZooKeeperReelectionWithReplacement() throws Exception {
        DefaultLeaderElectionService[] defaultLeaderElectionServiceArr = new DefaultLeaderElectionService[3];
        TestingContender[] testingContenderArr = new TestingContender[3];
        DefaultLeaderRetrievalService defaultLeaderRetrievalService = null;
        TestingListener testingListener = new TestingListener();
        try {
            defaultLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(this.curatorFrameworkWrapper.asCuratorFramework());
            defaultLeaderRetrievalService.start(testingListener);
            for (int i = 0; i < 3; i++) {
                defaultLeaderElectionServiceArr[i] = ZooKeeperUtils.createLeaderElectionService(this.curatorFrameworkWrapper.asCuratorFramework());
                testingContenderArr[i] = new TestingContender("akka//user/jobmanager_" + i + "_0", defaultLeaderElectionServiceArr[i]);
                defaultLeaderElectionServiceArr[i].start(testingContenderArr[i]);
            }
            Pattern compile = Pattern.compile("akka//user/jobmanager_(\\d+)_(\\d+)");
            for (int i2 = 0; i2 < 30; i2++) {
                testingListener.waitForNewLeader(timeout);
                Matcher matcher = compile.matcher(testingListener.getAddress());
                if (!matcher.find()) {
                    throw new Exception("Did not find the leader's index.");
                }
                int parseInt = Integer.parseInt(matcher.group(1));
                int parseInt2 = Integer.parseInt(matcher.group(2));
                Assert.assertEquals(testingListener.getLeaderSessionID(), testingContenderArr[parseInt].getLeaderSessionID());
                defaultLeaderElectionServiceArr[parseInt].stop();
                defaultLeaderElectionServiceArr[parseInt] = ZooKeeperUtils.createLeaderElectionService(this.curatorFrameworkWrapper.asCuratorFramework());
                testingContenderArr[parseInt] = new TestingContender("akka//user/jobmanager_" + parseInt + "_" + (parseInt2 + 1), defaultLeaderElectionServiceArr[parseInt]);
                defaultLeaderElectionServiceArr[parseInt].start(testingContenderArr[parseInt]);
            }
            if (defaultLeaderRetrievalService != null) {
                defaultLeaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService defaultLeaderElectionService : defaultLeaderElectionServiceArr) {
                if (defaultLeaderElectionService != null) {
                    defaultLeaderElectionService.stop();
                }
            }
        } catch (Throwable th) {
            if (defaultLeaderRetrievalService != null) {
                defaultLeaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService defaultLeaderElectionService2 : defaultLeaderElectionServiceArr) {
                if (defaultLeaderElectionService2 != null) {
                    defaultLeaderElectionService2.stop();
                }
            }
            throw th;
        }
    }

    @Test
    public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
        TestingLeaderElectionEventHandler testingLeaderElectionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler testingLeaderRetrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver zooKeeperLeaderElectionDriver = null;
        LeaderRetrievalDriver leaderRetrievalDriver = null;
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWithUnhandledErrorListener = null;
        try {
            zooKeeperLeaderElectionDriver = createAndInitLeaderElectionDriver(this.curatorFrameworkWrapper.asCuratorFramework(), testingLeaderElectionEventHandler);
            testingLeaderElectionEventHandler.waitForLeader(timeout);
            Assert.assertThat(testingLeaderElectionEventHandler.getConfirmedLeaderInformation(), Matchers.is(TEST_LEADER));
            curatorFrameworkWithUnhandledErrorListener = ZooKeeperUtils.startCuratorFramework(this.configuration, NoOpFatalErrorHandler.INSTANCE);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeUTF("faultyContender");
            objectOutputStream.writeObject(UUID.randomUUID());
            objectOutputStream.close();
            boolean z = false;
            String connectionInformationPath = zooKeeperLeaderElectionDriver.getConnectionInformationPath();
            while (!z) {
                curatorFrameworkWithUnhandledErrorListener.asCuratorFramework().delete().forPath(connectionInformationPath);
                try {
                    curatorFrameworkWithUnhandledErrorListener.asCuratorFramework().create().forPath(connectionInformationPath, byteArrayOutputStream.toByteArray());
                    z = true;
                } catch (KeeperException.NodeExistsException e) {
                }
            }
            ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory = ZooKeeperUtils.createLeaderRetrievalDriverFactory(this.curatorFrameworkWrapper.asCuratorFramework());
            testingLeaderRetrievalEventHandler.getClass();
            leaderRetrievalDriver = createLeaderRetrievalDriverFactory.createLeaderRetrievalDriver(testingLeaderRetrievalEventHandler, testingLeaderRetrievalEventHandler::handleError);
            if (testingLeaderRetrievalEventHandler.waitForNewLeader(timeout).equals("faultyContender")) {
                testingLeaderRetrievalEventHandler.waitForNewLeader(timeout);
            }
            Assert.assertThat(testingLeaderRetrievalEventHandler.getLeaderSessionID(), Matchers.is(TEST_LEADER.getLeaderSessionID()));
            Assert.assertThat(testingLeaderRetrievalEventHandler.getAddress(), Matchers.is(TEST_LEADER.getLeaderAddress()));
            testingLeaderElectionEventHandler.close();
            if (zooKeeperLeaderElectionDriver != null) {
                zooKeeperLeaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (curatorFrameworkWithUnhandledErrorListener != null) {
                curatorFrameworkWithUnhandledErrorListener.close();
            }
        } catch (Throwable th) {
            testingLeaderElectionEventHandler.close();
            if (zooKeeperLeaderElectionDriver != null) {
                zooKeeperLeaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (curatorFrameworkWithUnhandledErrorListener != null) {
                curatorFrameworkWithUnhandledErrorListener.close();
            }
            throw th;
        }
    }

    @Test
    public void testExceptionForwarding() throws Exception {
        LeaderElectionDriver leaderElectionDriver = null;
        TestingLeaderElectionEventHandler testingLeaderElectionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        CreateBuilder createBuilder = (CreateBuilder) Mockito.mock(CreateBuilder.class, Mockito.RETURNS_DEEP_STUBS);
        Exception exc = new Exception("Test exception");
        CuratorFrameworkWithUnhandledErrorListener startCuratorFramework = ZooKeeperUtils.startCuratorFramework(this.configuration, NoOpFatalErrorHandler.INSTANCE);
        try {
            CuratorFramework curatorFramework = (CuratorFramework) Mockito.spy(startCuratorFramework.asCuratorFramework());
            ((CuratorFramework) Mockito.doAnswer(invocationOnMock -> {
                return createBuilder;
            }).when(curatorFramework)).create();
            Mockito.when(((ACLBackgroundPathAndBytesable) createBuilder.creatingParentsIfNeeded().withMode((CreateMode) org.mockito.Matchers.any(CreateMode.class))).forPath(Mockito.anyString(), (byte[]) Mockito.any(byte[].class))).thenThrow(new Throwable[]{exc});
            leaderElectionDriver = createAndInitLeaderElectionDriver(curatorFramework, testingLeaderElectionEventHandler);
            testingLeaderElectionEventHandler.waitForError(timeout);
            Assert.assertNotNull(testingLeaderElectionEventHandler.getError());
            Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(testingLeaderElectionEventHandler.getError(), "Test exception").isPresent()), Matchers.is(true));
            testingLeaderElectionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (startCuratorFramework != null) {
                startCuratorFramework.close();
            }
        } catch (Throwable th) {
            testingLeaderElectionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (startCuratorFramework != null) {
                startCuratorFramework.close();
            }
            throw th;
        }
    }

    @Test
    public void testEphemeralZooKeeperNodes() throws Exception {
        LeaderRetrievalDriver leaderRetrievalDriver = null;
        TestingLeaderElectionEventHandler testingLeaderElectionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler testingLeaderRetrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWithUnhandledErrorListener = null;
        NodeCache nodeCache = null;
        try {
            CuratorFrameworkWithUnhandledErrorListener startCuratorFramework = ZooKeeperUtils.startCuratorFramework(this.configuration, this.testingFatalErrorHandlerResource.getFatalErrorHandler());
            curatorFrameworkWithUnhandledErrorListener = ZooKeeperUtils.startCuratorFramework(this.configuration, this.testingFatalErrorHandlerResource.getFatalErrorHandler());
            ZooKeeperLeaderElectionDriver createAndInitLeaderElectionDriver = createAndInitLeaderElectionDriver(startCuratorFramework.asCuratorFramework(), testingLeaderElectionEventHandler);
            ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory = ZooKeeperUtils.createLeaderRetrievalDriverFactory(curatorFrameworkWithUnhandledErrorListener.asCuratorFramework());
            testingLeaderRetrievalEventHandler.getClass();
            leaderRetrievalDriver = createLeaderRetrievalDriverFactory.createLeaderRetrievalDriver(testingLeaderRetrievalEventHandler, testingLeaderRetrievalEventHandler::handleError);
            nodeCache = new NodeCache(curatorFrameworkWithUnhandledErrorListener.asCuratorFramework(), createAndInitLeaderElectionDriver.getConnectionInformationPath());
            ExistsCacheListener existsCacheListener = new ExistsCacheListener(nodeCache);
            DeletedCacheListener deletedCacheListener = new DeletedCacheListener(nodeCache);
            nodeCache.getListenable().addListener(existsCacheListener);
            nodeCache.start();
            testingLeaderElectionEventHandler.waitForLeader(timeout);
            testingLeaderRetrievalEventHandler.waitForNewLeader(timeout);
            existsCacheListener.nodeExists().get(timeout, TimeUnit.MILLISECONDS);
            nodeCache.getListenable().addListener(deletedCacheListener);
            createAndInitLeaderElectionDriver.close();
            startCuratorFramework.close();
            deletedCacheListener.nodeDeleted().get(timeout, TimeUnit.MILLISECONDS);
            try {
                testingLeaderRetrievalEventHandler.waitForNewLeader(1000L);
                Assert.fail("TimeoutException was expected because there is no leader registered and thus there shouldn't be any leader information in ZooKeeper.");
            } catch (TimeoutException e) {
            }
            testingLeaderElectionEventHandler.close();
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (nodeCache != null) {
                nodeCache.close();
            }
            if (curatorFrameworkWithUnhandledErrorListener != null) {
                curatorFrameworkWithUnhandledErrorListener.close();
            }
        } catch (Throwable th) {
            testingLeaderElectionEventHandler.close();
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (nodeCache != null) {
                nodeCache.close();
            }
            if (curatorFrameworkWithUnhandledErrorListener != null) {
                curatorFrameworkWithUnhandledErrorListener.close();
            }
            throw th;
        }
    }

    @Test
    public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception {
        TestingLeaderElectionEventHandler testingLeaderElectionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler testingLeaderRetrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver zooKeeperLeaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver zooKeeperLeaderRetrievalDriver = null;
        try {
            zooKeeperLeaderElectionDriver = createAndInitLeaderElectionDriver(this.curatorFrameworkWrapper.asCuratorFramework(), testingLeaderElectionEventHandler);
            testingLeaderElectionEventHandler.waitForLeader(timeout);
            Assert.assertThat(testingLeaderElectionEventHandler.getConfirmedLeaderInformation(), Matchers.is(TEST_LEADER));
            zooKeeperLeaderElectionDriver.notLeader();
            testingLeaderElectionEventHandler.waitForRevokeLeader(timeout);
            Assert.assertThat(testingLeaderElectionEventHandler.getConfirmedLeaderInformation(), Matchers.is(LeaderInformation.empty()));
            ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory = ZooKeeperUtils.createLeaderRetrievalDriverFactory(this.curatorFrameworkWrapper.asCuratorFramework());
            testingLeaderRetrievalEventHandler.getClass();
            zooKeeperLeaderRetrievalDriver = createLeaderRetrievalDriverFactory.createLeaderRetrievalDriver(testingLeaderRetrievalEventHandler, testingLeaderRetrievalEventHandler::handleError);
            testingLeaderRetrievalEventHandler.waitForNewLeader(timeout);
            Assert.assertThat(testingLeaderRetrievalEventHandler.getLeaderSessionID(), Matchers.is(TEST_LEADER.getLeaderSessionID()));
            Assert.assertThat(testingLeaderRetrievalEventHandler.getAddress(), Matchers.is(TEST_LEADER.getLeaderAddress()));
            testingLeaderElectionEventHandler.close();
            if (zooKeeperLeaderElectionDriver != null) {
                zooKeeperLeaderElectionDriver.close();
            }
            if (zooKeeperLeaderRetrievalDriver != null) {
                zooKeeperLeaderRetrievalDriver.close();
            }
        } catch (Throwable th) {
            testingLeaderElectionEventHandler.close();
            if (zooKeeperLeaderElectionDriver != null) {
                zooKeeperLeaderElectionDriver.close();
            }
            if (zooKeeperLeaderRetrievalDriver != null) {
                zooKeeperLeaderRetrievalDriver.close();
            }
            throw th;
        }
    }

    @Test
    public void testUnExpectedErrorForwarding() throws Exception {
        LeaderElectionDriver leaderElectionDriver = null;
        TestingLeaderElectionEventHandler testingLeaderElectionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        final FlinkRuntimeException flinkRuntimeException = new FlinkRuntimeException("testUnExpectedErrorForwarding");
        try {
            CuratorFrameworkWithUnhandledErrorListener startCuratorFramework = ZooKeeperUtils.startCuratorFramework(CuratorFrameworkFactory.builder().connectString(this.testingServer.getConnectString()).retryPolicy(new ExponentialBackoffRetry(1, 0)).aclProvider(new ACLProvider() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.1
                public List<ACL> getDefaultAcl() {
                    throw flinkRuntimeException;
                }

                public List<ACL> getAclForPath(String str) {
                    throw flinkRuntimeException;
                }
            }).namespace("flink"), testingFatalErrorHandler);
            Throwable th = null;
            try {
                try {
                    CuratorFramework asCuratorFramework = startCuratorFramework.asCuratorFramework();
                    Assert.assertFalse(testingFatalErrorHandler.getErrorFuture().isDone());
                    leaderElectionDriver = createAndInitLeaderElectionDriver(asCuratorFramework, testingLeaderElectionEventHandler);
                    Assert.assertThat(testingFatalErrorHandler.getErrorFuture().join(), FlinkMatchers.containsCause(flinkRuntimeException));
                    if (startCuratorFramework != null) {
                        if (0 != 0) {
                            try {
                                startCuratorFramework.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            startCuratorFramework.close();
                        }
                    }
                    testingLeaderElectionEventHandler.close();
                    if (leaderElectionDriver != null) {
                        leaderElectionDriver.close();
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            testingLeaderElectionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            throw th4;
        }
    }

    private ZooKeeperLeaderElectionDriver createAndInitLeaderElectionDriver(CuratorFramework curatorFramework, TestingLeaderElectionEventHandler testingLeaderElectionEventHandler) throws Exception {
        ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory = ZooKeeperUtils.createLeaderElectionDriverFactory(curatorFramework);
        testingLeaderElectionEventHandler.getClass();
        ZooKeeperLeaderElectionDriver createLeaderElectionDriver = createLeaderElectionDriverFactory.createLeaderElectionDriver(testingLeaderElectionEventHandler, testingLeaderElectionEventHandler::handleError, TEST_URL);
        testingLeaderElectionEventHandler.init(createLeaderElectionDriver);
        return createLeaderElectionDriver;
    }
}
