/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingContender;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionEventHandler;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator5.org.apache.curator.RetryPolicy;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.ACLProvider;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CreateBuilder;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.ACL;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperLeaderElectionTest
extends TestLogger {
    private TestingServer testingServer;
    private Configuration configuration;
    private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;
    private static final String LEADER_ADDRESS = "akka//user/jobmanager";
    private static final long timeout = 200000L;
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);
    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

    @Before
    public void before() {
        try {
            this.testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
        }
        catch (Exception e) {
            throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
        }
        this.configuration = new Configuration();
        this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
        this.configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        this.curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)this.testingFatalErrorHandlerResource.getFatalErrorHandler());
    }

    @After
    public void after() throws IOException {
        if (this.curatorFrameworkWrapper != null) {
            this.curatorFrameworkWrapper.close();
            this.curatorFrameworkWrapper = null;
        }
        if (this.testingServer != null) {
            this.testingServer.close();
            this.testingServer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperLeaderElectionRetrieval() throws Exception {
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        try {
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(this.curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler);
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework()).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            electionEventHandler.waitForLeader();
            LeaderInformation confirmedLeaderInformation = electionEventHandler.getConfirmedLeaderInformation();
            Assert.assertThat((Object)confirmedLeaderInformation.getLeaderAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)LEADER_ADDRESS));
            retrievalEventHandler.waitForNewLeader();
            Assert.assertThat((Object)retrievalEventHandler.getLeaderSessionID(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)confirmedLeaderInformation.getLeaderSessionID()));
            Assert.assertThat((Object)retrievalEventHandler.getAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)confirmedLeaderInformation.getLeaderAddress()));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperReelection() throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        int num = 10;
        DefaultLeaderElectionService[] leaderElectionService = new DefaultLeaderElectionService[num];
        TestingContender[] contenders = new TestingContender[num];
        DefaultLeaderRetrievalService leaderRetrievalService = null;
        TestingListener listener = new TestingListener();
        try {
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework());
            LOG.debug("Start leader retrieval service for the TestingListener.");
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            for (int i = 0; i < num; ++i) {
                leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework());
                contenders[i] = new TestingContender(this.createAddress(i), (LeaderElectionService)leaderElectionService[i]);
                LOG.debug("Start leader election service for contender #{}.", (Object)i);
                leaderElectionService[i].start((LeaderContender)contenders[i]);
            }
            String pattern = "akka//user/jobmanager_(\\d+)";
            Pattern regex = Pattern.compile(pattern);
            int numberSeenLeaders = 0;
            while (deadline.hasTimeLeft() && numberSeenLeaders < num) {
                LOG.debug("Wait for new leader #{}.", (Object)numberSeenLeaders);
                String address = listener.waitForNewLeader();
                Matcher m = regex.matcher(address);
                if (m.find()) {
                    int index = Integer.parseInt(m.group(1));
                    TestingContender contender = contenders[index];
                    if (!address.equals(this.createAddress(index)) || !listener.getLeaderSessionID().equals(contender.getLeaderSessionID())) continue;
                    LOG.debug("Stop leader election service of contender #{}.", (Object)numberSeenLeaders);
                    leaderElectionService[index].stop();
                    leaderElectionService[index] = null;
                    ++numberSeenLeaders;
                    continue;
                }
                Assert.fail((String)"Did not find the leader's index.");
            }
            Assert.assertFalse((String)"Did not complete the leader reelection in time.", (boolean)deadline.isOverdue());
            Assert.assertEquals((long)num, (long)numberSeenLeaders);
        }
        finally {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService electionService : leaderElectionService) {
                if (electionService == null) continue;
                electionService.stop();
            }
        }
    }

    @Nonnull
    private String createAddress(int i) {
        return "akka//user/jobmanager_" + i;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperReelectionWithReplacement() throws Exception {
        int num = 3;
        int numTries = 30;
        DefaultLeaderElectionService[] leaderElectionService = new DefaultLeaderElectionService[num];
        TestingContender[] contenders = new TestingContender[num];
        DefaultLeaderRetrievalService leaderRetrievalService = null;
        TestingListener listener = new TestingListener();
        try {
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework());
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            for (int i = 0; i < num; ++i) {
                leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework());
                contenders[i] = new TestingContender("akka//user/jobmanager_" + i + "_0", (LeaderElectionService)leaderElectionService[i]);
                leaderElectionService[i].start((LeaderContender)contenders[i]);
            }
            String pattern = "akka//user/jobmanager_(\\d+)_(\\d+)";
            Pattern regex = Pattern.compile(pattern);
            for (int i = 0; i < numTries; ++i) {
                listener.waitForNewLeader();
                String address = listener.getAddress();
                Matcher m = regex.matcher(address);
                if (!m.find()) {
                    throw new Exception("Did not find the leader's index.");
                }
                int index = Integer.parseInt(m.group(1));
                int lastTry = Integer.parseInt(m.group(2));
                Assert.assertEquals((Object)listener.getLeaderSessionID(), (Object)contenders[index].getLeaderSessionID());
                leaderElectionService[index].stop();
                leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework());
                contenders[index] = new TestingContender("akka//user/jobmanager_" + index + "_" + (lastTry + 1), (LeaderElectionService)leaderElectionService[index]);
                leaderElectionService[index].start((LeaderContender)contenders[index]);
            }
        }
        finally {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService electionService : leaderElectionService) {
                if (electionService == null) continue;
                electionService.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderChangeWriteLeaderInformationOnlyOnce() throws Exception {
        LeaderInformationConsumer leaderInformationConsumer = new LeaderInformationConsumer();
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS, leaderInformationConsumer);
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        try {
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(this.curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler);
            electionEventHandler.waitForLeader();
            LeaderInformation confirmedLeaderInformation = electionEventHandler.getConfirmedLeaderInformation();
            Assertions.assertThat((String)confirmedLeaderInformation.getLeaderAddress()).isEqualTo(LEADER_ADDRESS);
            Assertions.assertThat(leaderInformationConsumer.getFirstUpdateFuture()).succeedsWithin(5L, TimeUnit.SECONDS);
            ((CompletableFutureAssert)Assertions.assertThat(leaderInformationConsumer.getAnotherUpdateFuture()).withFailMessage("Another leader information update is not expected.", new Object[0])).failsWithin(5L, TimeUnit.MILLISECONDS).withThrowableOfType(TimeoutException.class);
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
        String faultyContenderUrl = "faultyContender";
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        CuratorFrameworkWithUnhandledErrorListener anotherCuratorFrameworkWrapper = null;
        try {
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(this.curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler);
            electionEventHandler.waitForLeader();
            LeaderInformation confirmedLeaderInformation = electionEventHandler.getConfirmedLeaderInformation();
            Assert.assertThat((Object)confirmedLeaderInformation.getLeaderAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)LEADER_ADDRESS));
            anotherCuratorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeUTF("faultyContender");
            oos.writeObject(UUID.randomUUID());
            oos.close();
            boolean dataWritten = false;
            String connectionInformationPath = leaderElectionDriver.getConnectionInformationPath();
            while (!dataWritten) {
                anotherCuratorFrameworkWrapper.asCuratorFramework().delete().forPath(connectionInformationPath);
                try {
                    anotherCuratorFrameworkWrapper.asCuratorFramework().create().forPath(connectionInformationPath, baos.toByteArray());
                    dataWritten = true;
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {}
            }
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework()).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            if (retrievalEventHandler.waitForNewLeader().equals("faultyContender")) {
                retrievalEventHandler.waitForNewLeader();
            }
            Assert.assertThat((Object)retrievalEventHandler.getLeaderSessionID(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)confirmedLeaderInformation.getLeaderSessionID()));
            Assert.assertThat((Object)retrievalEventHandler.getAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)confirmedLeaderInformation.getLeaderAddress()));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (anotherCuratorFrameworkWrapper != null) {
                anotherCuratorFrameworkWrapper.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExceptionForwarding() throws Exception {
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
        CuratorFramework client = null;
        CreateBuilder mockCreateBuilder = (CreateBuilder)Mockito.mock(CreateBuilder.class, (Answer)Mockito.RETURNS_DEEP_STUBS);
        String exMsg = "Test exception";
        Exception testException = new Exception("Test exception");
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
        try {
            client = (CuratorFramework)Mockito.spy((Object)curatorFrameworkWrapper.asCuratorFramework());
            ((CuratorFramework)Mockito.doAnswer(invocation -> mockCreateBuilder).when((Object)client)).create();
            Mockito.when((Object)((ACLBackgroundPathAndBytesable)mockCreateBuilder.creatingParentsIfNeeded().withMode((CreateMode)Matchers.any(CreateMode.class))).forPath(Mockito.anyString(), (byte[])Mockito.any(byte[].class))).thenThrow(new Throwable[]{testException});
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(client, electionEventHandler);
            electionEventHandler.waitForError();
            Assert.assertNotNull((Object)electionEventHandler.getError());
            Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)electionEventHandler.getError(), (String)"Test exception").isPresent(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)true));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (curatorFrameworkWrapper != null) {
                curatorFrameworkWrapper.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEphemeralZooKeeperNodes() throws Exception {
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = null;
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper2 = null;
        NodeCache cache = null;
        try {
            curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)this.testingFatalErrorHandlerResource.getFatalErrorHandler());
            curatorFrameworkWrapper2 = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)this.testingFatalErrorHandlerResource.getFatalErrorHandler());
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler);
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)curatorFrameworkWrapper2.asCuratorFramework()).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            cache = new NodeCache(curatorFrameworkWrapper2.asCuratorFramework(), leaderElectionDriver.getConnectionInformationPath());
            ExistsCacheListener existsListener = new ExistsCacheListener(cache);
            DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache);
            cache.getListenable().addListener((Object)existsListener);
            cache.start();
            electionEventHandler.waitForLeader();
            retrievalEventHandler.waitForNewLeader();
            Future<Boolean> existsFuture = existsListener.nodeExists();
            existsFuture.get(200000L, TimeUnit.MILLISECONDS);
            cache.getListenable().addListener((Object)deletedCacheListener);
            leaderElectionDriver.close();
            curatorFrameworkWrapper.close();
            Future<Boolean> deletedFuture = deletedCacheListener.nodeDeleted();
            deletedFuture.get(200000L, TimeUnit.MILLISECONDS);
            retrievalEventHandler.waitForEmptyLeaderInformation();
        }
        finally {
            electionEventHandler.close();
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (cache != null) {
                cache.close();
            }
            if (curatorFrameworkWrapper2 != null) {
                curatorFrameworkWrapper2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception {
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        try {
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(this.curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler);
            electionEventHandler.waitForLeader();
            LeaderInformation confirmedLeaderInformation = electionEventHandler.getConfirmedLeaderInformation();
            Assert.assertThat((Object)confirmedLeaderInformation.getLeaderAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)LEADER_ADDRESS));
            leaderElectionDriver.notLeader();
            electionEventHandler.waitForRevokeLeader();
            Assert.assertThat((Object)electionEventHandler.getConfirmedLeaderInformation(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)LeaderInformation.empty()));
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)this.curatorFrameworkWrapper.asCuratorFramework()).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            retrievalEventHandler.waitForNewLeader();
            Assert.assertThat((Object)retrievalEventHandler.getLeaderSessionID(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)confirmedLeaderInformation.getLeaderSessionID()));
            Assert.assertThat((Object)retrievalEventHandler.getAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)confirmedLeaderInformation.getLeaderAddress()));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnExpectedErrorForwarding() throws Exception {
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
        TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
        final FlinkRuntimeException testException = new FlinkRuntimeException("testUnExpectedErrorForwarding");
        CuratorFrameworkFactory.Builder curatorFrameworkBuilder = CuratorFrameworkFactory.builder().connectString(this.testingServer.getConnectString()).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(1, 0)).aclProvider(new ACLProvider(){

            public List<ACL> getDefaultAcl() {
                throw testException;
            }

            public List<ACL> getAclForPath(String s) {
                throw testException;
            }
        }).namespace("flink");
        try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((CuratorFrameworkFactory.Builder)curatorFrameworkBuilder, (FatalErrorHandler)fatalErrorHandler);){
            CuratorFramework clientWithErrorHandler = curatorFrameworkWrapper.asCuratorFramework();
            Assert.assertFalse((boolean)fatalErrorHandler.getErrorFuture().isDone());
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(clientWithErrorHandler, electionEventHandler);
            Assert.assertThat((Object)fatalErrorHandler.getErrorFuture().join(), (org.hamcrest.Matcher)FlinkMatchers.containsCause((Throwable)testException));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
        }
    }

    private ZooKeeperLeaderElectionDriver createAndInitLeaderElectionDriver(CuratorFramework client, TestingLeaderElectionEventHandler electionEventHandler) throws Exception {
        ZooKeeperLeaderElectionDriver leaderElectionDriver = ZooKeeperUtils.createLeaderElectionDriverFactory((CuratorFramework)client).createLeaderElectionDriver((LeaderElectionEventHandler)electionEventHandler, electionEventHandler::handleError, LEADER_ADDRESS);
        electionEventHandler.init((LeaderElectionDriver)leaderElectionDriver);
        return leaderElectionDriver;
    }

    private static class LeaderInformationConsumer
    implements Consumer<LeaderInformation> {
        final CompletableFuture<Void> firstUpdateFuture = new CompletableFuture();
        final CompletableFuture<Void> anotherUpdateFuture = new CompletableFuture();

        private LeaderInformationConsumer() {
        }

        @Override
        public void accept(LeaderInformation leaderInformation) {
            if (!this.firstUpdateFuture.isDone()) {
                this.firstUpdateFuture.complete(null);
            } else {
                this.anotherUpdateFuture.complete(null);
            }
        }

        public CompletableFuture<Void> getFirstUpdateFuture() {
            return this.firstUpdateFuture;
        }

        public CompletableFuture<Void> getAnotherUpdateFuture() {
            return this.anotherUpdateFuture;
        }
    }

    private static class DeletedCacheListener
    implements NodeCacheListener {
        final CompletableFuture<Boolean> deletedPromise = new CompletableFuture();
        final NodeCache cache;

        public DeletedCacheListener(NodeCache cache) {
            this.cache = cache;
        }

        public Future<Boolean> nodeDeleted() {
            return this.deletedPromise;
        }

        public void nodeChanged() throws Exception {
            ChildData data = this.cache.getCurrentData();
            if (data == null && !this.deletedPromise.isDone()) {
                this.deletedPromise.complete(true);
                this.cache.getListenable().removeListener((Object)this);
            }
        }
    }

    private static class ExistsCacheListener
    implements NodeCacheListener {
        final CompletableFuture<Boolean> existsPromise = new CompletableFuture();
        final NodeCache cache;

        public ExistsCacheListener(NodeCache cache) {
            this.cache = cache;
        }

        public Future<Boolean> nodeExists() {
            return this.existsPromise;
        }

        public void nodeChanged() throws Exception {
            ChildData data = this.cache.getCurrentData();
            if (data != null && !this.existsPromise.isDone()) {
                this.existsPromise.complete(true);
                this.cache.getListenable().removeListener((Object)this);
            }
        }
    }
}

