package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.class */
public class ZooKeeperLeaderElectionConnectionHandlingTest extends TestLogger {
    private TestingServer testingServer;
    private Configuration config;
    private CuratorFramework zooKeeperClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest$QueueLeaderElectionListener.class */
    public static class QueueLeaderElectionListener implements LeaderRetrievalListener {
        private final BlockingQueue<CompletableFuture<String>> queue;
        private final Duration timeout;

        public QueueLeaderElectionListener(int i) {
            this(i, null);
        }

        public QueueLeaderElectionListener(int i, Duration duration) {
            this.queue = new ArrayBlockingQueue(i);
            this.timeout = duration;
        }

        public void notifyLeaderAddress(String str, UUID uuid) {
            try {
                if (this.timeout == null) {
                    this.queue.put(CompletableFuture.completedFuture(str));
                } else {
                    this.queue.offer(CompletableFuture.completedFuture(str), this.timeout.toMillis(), TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public CompletableFuture<String> next() {
            try {
                return this.timeout == null ? this.queue.take() : this.queue.poll(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public void handleError(Exception exc) {
            throw new UnsupportedOperationException("handleError(Exception) shouldn't have been called, but it was triggered anyway.", exc);
        }
    }

    @Before
    public void before() throws Exception {
        this.testingServer = new TestingServer();
        this.config = new Configuration();
        this.config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        this.config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
        this.zooKeeperClient = ZooKeeperUtils.startCuratorFramework(this.config);
    }

    @After
    public void after() throws Exception {
        closeTestServer();
        if (this.zooKeeperClient != null) {
            this.zooKeeperClient.close();
            this.zooKeeperClient = null;
        }
    }

    @Test
    public void testConnectionSuspendedHandlingDuringInitialization() throws Exception {
        QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1, Duration.ofMillis(50L));
        ZooKeeperUtils.createLeaderRetrievalService(this.zooKeeperClient, this.config).start(queueLeaderElectionListener);
        Assert.assertThat("No results are expected, yet, since no leader was elected.", queueLeaderElectionListener.next(), CoreMatchers.is(CoreMatchers.nullValue()));
        closeTestServer();
        Assert.assertThat("No result is expected since there was no leader elected before stopping the server, yet.", queueLeaderElectionListener.next(), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void testConnectionSuspendedHandling() throws Exception {
        ZooKeeperLeaderElectionService createLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(this.zooKeeperClient, this.config);
        createLeaderElectionService.start(new TestingContender("localhost", createLeaderElectionService));
        QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2);
        ZooKeeperUtils.createLeaderRetrievalService(this.zooKeeperClient, this.config).start(queueLeaderElectionListener);
        Assert.assertThat("The first result is expected to be the initially set leader address.", queueLeaderElectionListener.next().get(), CoreMatchers.is("localhost"));
        closeTestServer();
        CompletableFuture<String> next = queueLeaderElectionListener.next();
        Assert.assertThat("The next result must not be missing.", next, CoreMatchers.is(CoreMatchers.notNullValue()));
        Assert.assertThat("The next result is expected to be null.", next.get(), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Exception {
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = new ZooKeeperLeaderRetrievalService(this.zooKeeperClient, "/testSameLeaderAfterReconnectTriggersListenerNotification/leaderAddress");
        QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2);
        zooKeeperLeaderRetrievalService.start(queueLeaderElectionListener);
        writeLeaderInformationToZooKeeper("/testSameLeaderAfterReconnectTriggersListenerNotification/leaderAddress", "foobar", UUID.randomUUID());
        queueLeaderElectionListener.next();
        this.testingServer.stop();
        queueLeaderElectionListener.next().join();
        this.testingServer.restart();
        Assert.assertThat(queueLeaderElectionListener.next().get(), CoreMatchers.is("foobar"));
    }

    private void writeLeaderInformationToZooKeeper(String str, String str2, UUID uuid) throws Exception {
        byte[] createLeaderInformation = createLeaderInformation(str2, uuid);
        if (this.zooKeeperClient.checkExists().forPath(str) != null) {
            this.zooKeeperClient.setData().forPath(str, createLeaderInformation);
        } else {
            this.zooKeeperClient.create().creatingParentsIfNeeded().forPath(str, createLeaderInformation);
        }
    }

    private byte[] createLeaderInformation(String str, UUID uuid) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    objectOutputStream.writeUTF(str);
                    objectOutputStream.writeObject(uuid);
                    objectOutputStream.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (objectOutputStream != null) {
                        if (0 != 0) {
                            try {
                                objectOutputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            objectOutputStream.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (objectOutputStream != null) {
                    if (th2 != null) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    @Test
    public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exception {
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = new ZooKeeperLeaderRetrievalService(this.zooKeeperClient, "/testNewLeaderAfterReconnectTriggersListenerNotification/leaderAddress");
        QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2);
        zooKeeperLeaderRetrievalService.start(queueLeaderElectionListener);
        writeLeaderInformationToZooKeeper("/testNewLeaderAfterReconnectTriggersListenerNotification/leaderAddress", "foobar", UUID.randomUUID());
        queueLeaderElectionListener.next();
        this.testingServer.stop();
        queueLeaderElectionListener.next().join();
        this.testingServer.restart();
        writeLeaderInformationToZooKeeper("/testNewLeaderAfterReconnectTriggersListenerNotification/leaderAddress", "barfoo", UUID.randomUUID());
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(queueLeaderElectionListener.next().get().equals("barfoo"));
        }, Deadline.fromNow(Duration.ofSeconds(30L)));
    }

    private void closeTestServer() throws IOException {
        if (this.testingServer != null) {
            this.testingServer.close();
            this.testingServer = null;
        }
    }
}
