package org.apache.flink.runtime.leaderretrieval;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingContender;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.class */
public class ZooKeeperLeaderRetrievalTest extends TestLogger {
    private static final RpcSystem RPC_SYSTEM = RpcSystem.load();

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private TestingServer testingServer;
    private Configuration config;
    private HighAvailabilityServices highAvailabilityServices;

    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

    /* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest$FindConnectingAddress.class */
    static class FindConnectingAddress implements Runnable {
        private final Duration timeout;
        private final LeaderRetrievalService leaderRetrievalService;
        private InetAddress result;
        private Exception exception;

        public FindConnectingAddress(Duration duration, LeaderRetrievalService leaderRetrievalService) {
            this.timeout = duration;
            this.leaderRetrievalService = leaderRetrievalService;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.result = LeaderRetrievalUtils.findConnectingAddress(this.leaderRetrievalService, this.timeout, ZooKeeperLeaderRetrievalTest.RPC_SYSTEM);
            } catch (Exception e) {
                this.exception = e;
            }
        }

        public InetAddress getInetAddress() throws Exception {
            if (this.exception != null) {
                throw this.exception;
            }
            return this.result;
        }
    }

    @Before
    public void before() throws Exception {
        this.testingServer = new TestingServer();
        this.config = new Configuration();
        this.config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        this.config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
        this.highAvailabilityServices = new ZooKeeperMultipleComponentLeaderElectionHaServices(ZooKeeperUtils.startCuratorFramework(this.config, this.testingFatalErrorHandlerResource.getFatalErrorHandler()), this.config, EXECUTOR_RESOURCE.getExecutor(), new VoidBlobStore(), this.testingFatalErrorHandlerResource.getFatalErrorHandler());
    }

    @After
    public void after() throws Exception {
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
            this.highAvailabilityServices = null;
        }
        if (this.testingServer != null) {
            this.testingServer.stop();
            this.testingServer = null;
        }
    }

    @Test
    public void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception {
        Duration ofMinutes = Duration.ofMinutes(1L);
        LeaderElectionService leaderElectionService = null;
        try {
            String rpcUrl = RPC_SYSTEM.getRpcUrl("1.1.1.1", 1234, "foobar", AddressResolution.NO_ADDRESS_RESOLUTION, this.config);
            try {
                InetAddress localHost = InetAddress.getLocalHost();
                InetSocketAddress inetSocketAddress = new InetSocketAddress(localHost, new ServerSocket(0, 50, localHost).getLocalPort());
                String rpcUrl2 = RPC_SYSTEM.getRpcUrl(localHost.getHostName(), inetSocketAddress.getPort(), "jobmanager", AddressResolution.NO_ADDRESS_RESOLUTION, this.config);
                LeaderElectionService jobManagerLeaderElectionService = this.highAvailabilityServices.getJobManagerLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID);
                jobManagerLeaderElectionService.start(new TestingContender(rpcUrl, jobManagerLeaderElectionService));
                FindConnectingAddress findConnectingAddress = new FindConnectingAddress(ofMinutes, this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID));
                Thread thread = new Thread(findConnectingAddress);
                thread.start();
                leaderElectionService = this.highAvailabilityServices.getJobManagerLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID);
                TestingContender testingContender = new TestingContender(rpcUrl2, leaderElectionService);
                Thread.sleep(1000L);
                jobManagerLeaderElectionService.stop();
                leaderElectionService.start(testingContender);
                thread.join();
                InetAddress inetAddress = findConnectingAddress.getInetAddress();
                Socket socket = new Socket();
                try {
                    socket.bind(new InetSocketAddress(inetAddress, 0));
                    socket.connect(inetSocketAddress, 1000);
                    socket.close();
                    if (leaderElectionService != null) {
                        leaderElectionService.stop();
                    }
                } catch (Throwable th) {
                    socket.close();
                    throw th;
                }
            } catch (UnknownHostException e) {
                System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
                if (0 != 0) {
                    leaderElectionService.stop();
                }
            } catch (IOException e2) {
                System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
                if (0 != 0) {
                    leaderElectionService.stop();
                }
            }
        } catch (Throwable th2) {
            if (leaderElectionService != null) {
                leaderElectionService.stop();
            }
            throw th2;
        }
    }

    @Test
    public void testTimeoutOfFindConnectingAddress() throws Exception {
        Assert.assertEquals(InetAddress.getLocalHost(), LeaderRetrievalUtils.findConnectingAddress(this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), Duration.ofSeconds(1L), RPC_SYSTEM));
    }
}
