package org.apache.flink.runtime.leaderretrieval;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.OptionalAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.class */
class ZooKeeperLeaderRetrievalConnectionHandlingTest {

    @RegisterExtension
    private final TestingFatalErrorHandlerExtension fatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();

    @RegisterExtension
    private final EachCallbackWrapper<ZooKeeperExtension> zooKeeperExtension = new EachCallbackWrapper<>(new ZooKeeperExtension());

    @Nullable
    private CuratorFramework zooKeeperClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest$QueueLeaderElectionListener.class */
    public static class QueueLeaderElectionListener implements LeaderRetrievalEventHandler {
        private final BlockingQueue<LeaderInformation> queue;

        public QueueLeaderElectionListener(int i) {
            this.queue = new ArrayBlockingQueue(i);
        }

        public void notifyLeaderAddress(LeaderInformation leaderInformation) {
            try {
                this.queue.put(leaderInformation);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public LeaderInformation next() {
            try {
                return this.queue.take();
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public Optional<LeaderInformation> next(Duration duration) {
            try {
                return Optional.ofNullable(this.queue.poll(duration.toMillis(), TimeUnit.MILLISECONDS));
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public void clearUnhandledEvents() {
            while (!this.queue.isEmpty()) {
                this.queue.poll();
            }
        }
    }

    ZooKeeperLeaderRetrievalConnectionHandlingTest() {
    }

    @BeforeEach
    public void before() throws Exception {
        this.zooKeeperClient = ((ZooKeeperExtension) this.zooKeeperExtension.getCustomExtension()).getZooKeeperClient(this.fatalErrorHandlerResource.getTestingFatalErrorHandler());
        this.zooKeeperClient.blockUntilConnected();
    }

    private ZooKeeperExtension getZooKeeper() {
        return (ZooKeeperExtension) this.zooKeeperExtension.getCustomExtension();
    }

    @Test
    public void testConnectionSuspendedHandlingDuringInitialization() throws Exception {
        testWithQueueLeaderElectionListener(queueLeaderElectionListener -> {
            return ZooKeeperUtils.createLeaderRetrievalDriverFactory(this.zooKeeperClient).createLeaderRetrievalDriver(queueLeaderElectionListener, this.fatalErrorHandlerResource.getTestingFatalErrorHandler());
        }, (zooKeeperLeaderRetrievalDriver, queueLeaderElectionListener2) -> {
            ((OptionalAssert) Assertions.assertThat(queueLeaderElectionListener2.next(Duration.ofMillis(50L))).as("No results are expected, yet, since no leader was elected.", new Object[0])).isNotPresent();
            getZooKeeper().restart();
            ((AbstractStringAssert) Assertions.assertThat(queueLeaderElectionListener2.next().getLeaderAddress()).as("The next result is expected to be null.", new Object[0])).isNull();
        });
    }

    @Test
    public void testConnectionSuspendedHandling() throws Exception {
        testWithQueueLeaderElectionListener(queueLeaderElectionListener -> {
            return new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testConnectionSuspendedHandling", queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION, this.fatalErrorHandlerResource.getTestingFatalErrorHandler());
        }, (zooKeeperLeaderRetrievalDriver, queueLeaderElectionListener2) -> {
            writeLeaderInformationToZooKeeper(zooKeeperLeaderRetrievalDriver.getConnectionInformationPath(), "localhost", UUID.randomUUID());
            ((AbstractStringAssert) Assertions.assertThat(queueLeaderElectionListener2.next().getLeaderAddress()).as("The first result is expected to be the initially set leader address.", new Object[0])).isEqualTo("localhost");
            getZooKeeper().restart();
            ((AbstractStringAssert) Assertions.assertThat(queueLeaderElectionListener2.next().getLeaderAddress()).as("The next result is expected to be null.", new Object[0])).isNull();
        });
    }

    @Test
    public void testSuspendedConnectionDoesNotClearLeaderInformationIfClearanceOnLostConnection() throws Exception {
        testWithQueueLeaderElectionListener(queueLeaderElectionListener -> {
            return new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testConnectionSuspendedHandling", queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_LOST_CONNECTION, this.fatalErrorHandlerResource.getTestingFatalErrorHandler());
        }, (zooKeeperLeaderRetrievalDriver, queueLeaderElectionListener2) -> {
            writeLeaderInformationToZooKeeper(zooKeeperLeaderRetrievalDriver.getConnectionInformationPath(), "localhost", UUID.randomUUID());
            ((AbstractStringAssert) Assertions.assertThat(queueLeaderElectionListener2.next().getLeaderAddress()).as("The first result is expected to be the initially set leader address.", new Object[0])).isEqualTo("localhost");
            getZooKeeper().close();
            Assertions.assertThat(queueLeaderElectionListener2.next(Duration.ofMillis(100L))).isNotPresent();
        });
    }

    @Test
    public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Exception {
        testWithQueueLeaderElectionListener(queueLeaderElectionListener -> {
            return new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testSameLeaderAfterReconnectTriggersListenerNotification", queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION, this.fatalErrorHandlerResource.getTestingFatalErrorHandler());
        }, (zooKeeperLeaderRetrievalDriver, queueLeaderElectionListener2) -> {
            writeLeaderInformationToZooKeeper(zooKeeperLeaderRetrievalDriver.getConnectionInformationPath(), "foobar", UUID.randomUUID());
            queueLeaderElectionListener2.next();
            getZooKeeper().stop();
            queueLeaderElectionListener2.next();
            getZooKeeper().restart();
            Assertions.assertThat(queueLeaderElectionListener2.next().getLeaderAddress()).isEqualTo("foobar");
        });
    }

    private void writeLeaderInformationToZooKeeper(String str, String str2, UUID uuid) throws Exception {
        byte[] createLeaderInformation = createLeaderInformation(str2, uuid);
        if (this.zooKeeperClient.checkExists().forPath(str) != null) {
            this.zooKeeperClient.setData().forPath(str, createLeaderInformation);
        } else {
            this.zooKeeperClient.create().creatingParentsIfNeeded().forPath(str, createLeaderInformation);
        }
    }

    private byte[] createLeaderInformation(String str, UUID uuid) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    objectOutputStream.writeUTF(str);
                    objectOutputStream.writeObject(uuid);
                    objectOutputStream.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (objectOutputStream != null) {
                        if (0 != 0) {
                            try {
                                objectOutputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            objectOutputStream.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (objectOutputStream != null) {
                    if (th2 != null) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    @Test
    public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exception {
        testWithQueueLeaderElectionListener(queueLeaderElectionListener -> {
            return new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testNewLeaderAfterReconnectTriggersListenerNotification", queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION, this.fatalErrorHandlerResource.getTestingFatalErrorHandler());
        }, (zooKeeperLeaderRetrievalDriver, queueLeaderElectionListener2) -> {
            writeLeaderInformationToZooKeeper(zooKeeperLeaderRetrievalDriver.getConnectionInformationPath(), "foobar", UUID.randomUUID());
            queueLeaderElectionListener2.next();
            getZooKeeper().stop();
            queueLeaderElectionListener2.next();
            getZooKeeper().restart();
            writeLeaderInformationToZooKeeper(zooKeeperLeaderRetrievalDriver.getConnectionInformationPath(), "barfoo", UUID.randomUUID());
            CommonTestUtils.waitUntilCondition(() -> {
                LeaderInformation next = queueLeaderElectionListener2.next();
                return Boolean.valueOf(next.getLeaderAddress() != null && next.getLeaderAddress().equals("barfoo"));
            });
        });
    }

    private void testWithQueueLeaderElectionListener(FunctionWithException<QueueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver, Exception> functionWithException, BiConsumerWithException<ZooKeeperLeaderRetrievalDriver, QueueLeaderElectionListener, Exception> biConsumerWithException) throws Exception {
        QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1);
        ZooKeeperLeaderRetrievalDriver zooKeeperLeaderRetrievalDriver = null;
        try {
            zooKeeperLeaderRetrievalDriver = (ZooKeeperLeaderRetrievalDriver) functionWithException.apply(queueLeaderElectionListener);
            biConsumerWithException.accept(zooKeeperLeaderRetrievalDriver, queueLeaderElectionListener);
            queueLeaderElectionListener.clearUnhandledEvents();
            if (zooKeeperLeaderRetrievalDriver != null) {
                zooKeeperLeaderRetrievalDriver.close();
            }
        } catch (Throwable th) {
            queueLeaderElectionListener.clearUnhandledEvents();
            if (zooKeeperLeaderRetrievalDriver != null) {
                zooKeeperLeaderRetrievalDriver.close();
            }
            throw th;
        }
    }
}
