package org.apache.flink.test.runtime.leaderelection;

import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.class */
public class ZooKeeperLeaderElectionITCase extends TestLogger {
    private static final Time RPC_TIMEOUT = Time.minutes(1);
    private static TestingServer zkServer;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase$BlockingOperator.class */
    public static class BlockingOperator extends AbstractInvokable {
        private static final Object lock = new Object();
        private static volatile boolean isBlocking = true;

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            synchronized (lock) {
                while (isBlocking) {
                    lock.wait();
                }
            }
        }

        public static void unblock() {
            synchronized (lock) {
                isBlocking = false;
                lock.notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase$TestLeaderRetrievalListener.class */
    private static class TestLeaderRetrievalListener implements LeaderRetrievalListener {
        private final CompletableFuture<String>[] futures;
        int changeIdx;

        private TestLeaderRetrievalListener(CompletableFuture<String>[] completableFutureArr) {
            this.changeIdx = 0;
            this.futures = completableFutureArr;
        }

        public void notifyLeaderAddress(@Nullable String str, @Nullable UUID uuid) {
            CompletableFuture<String>[] completableFutureArr = this.futures;
            int i = this.changeIdx;
            this.changeIdx = i + 1;
            completableFutureArr[i].complete(str);
        }

        public void handleError(Exception exc) {
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (zkServer != null) {
            zkServer.close();
            zkServer = null;
        }
    }

    @Test
    public void testJobExecutionOnClusterWithLeaderChange() throws Exception {
        Configuration createZooKeeperHAConfig = ZooKeeperTestUtils.createZooKeeperHAConfig(zkServer.getConnectString(), this.tempFolder.newFolder().getAbsolutePath());
        createZooKeeperHAConfig.setLong(ClusterOptions.REFUSED_REGISTRATION_DELAY, 50L);
        TestingMiniCluster build = TestingMiniCluster.newBuilder(TestingMiniClusterConfiguration.newBuilder().setConfiguration(createZooKeeperHAConfig).setNumberDispatcherResourceManagerComponents(3).setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build()).build();
        Throwable th = null;
        try {
            CuratorFrameworkWithUnhandledErrorListener startCuratorFramework = ZooKeeperUtils.startCuratorFramework(createZooKeeperHAConfig, th2 -> {
                Assert.fail("Fatal error in curator framework.");
            });
            Throwable th3 = null;
            try {
                try {
                    DefaultLeaderRetrievalService createLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(startCuratorFramework.asCuratorFramework(), ZooKeeperUtils.getLeaderPathForResourceManager(), createZooKeeperHAConfig);
                    CompletableFuture[] completableFutureArr = new CompletableFuture[3];
                    for (int i = 0; i < 3; i++) {
                        completableFutureArr[i] = new CompletableFuture();
                    }
                    createLeaderRetrievalService.start(new TestLeaderRetrievalListener(completableFutureArr));
                    build.start();
                    JobGraph createJobGraph = createJobGraph(4);
                    build.submitJob(createJobGraph).get();
                    String str = null;
                    for (int i2 = 0; i2 < 2; i2++) {
                        DispatcherGateway nextLeadingDispatcherGateway = getNextLeadingDispatcherGateway(build, str);
                        completableFutureArr[i2].get();
                        str = nextLeadingDispatcherGateway.getAddress();
                        awaitRunningStatus(nextLeadingDispatcherGateway, createJobGraph);
                        nextLeadingDispatcherGateway.shutDownCluster();
                    }
                    DispatcherGateway nextLeadingDispatcherGateway2 = getNextLeadingDispatcherGateway(build, str);
                    completableFutureArr[2].get();
                    awaitRunningStatus(nextLeadingDispatcherGateway2, createJobGraph);
                    CompletableFuture requestJobResult = nextLeadingDispatcherGateway2.requestJobResult(createJobGraph.getJobID(), RPC_TIMEOUT);
                    BlockingOperator.unblock();
                    Assert.assertThat(Boolean.valueOf(((JobResult) requestJobResult.get()).isSuccess()), Matchers.is(true));
                    createLeaderRetrievalService.stop();
                    if (startCuratorFramework != null) {
                        if (0 != 0) {
                            try {
                                startCuratorFramework.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            startCuratorFramework.close();
                        }
                    }
                    if (build != null) {
                        if (0 == 0) {
                            build.close();
                            return;
                        }
                        try {
                            build.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th3 = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (startCuratorFramework != null) {
                    if (th3 != null) {
                        try {
                            startCuratorFramework.close();
                        } catch (Throwable th8) {
                            th3.addSuppressed(th8);
                        }
                    } else {
                        startCuratorFramework.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    build.close();
                }
            }
            throw th9;
        }
    }

    private static void awaitRunningStatus(DispatcherGateway dispatcherGateway, JobGraph jobGraph) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), RPC_TIMEOUT).get() == JobStatus.RUNNING);
        }, 50L);
    }

    private DispatcherGateway getNextLeadingDispatcherGateway(TestingMiniCluster testingMiniCluster, @Nullable String str) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(!((DispatcherGateway) testingMiniCluster.getDispatcherGatewayFuture().get()).getAddress().equals(str));
        }, 20L);
        return (DispatcherGateway) testingMiniCluster.getDispatcherGatewayFuture().get();
    }

    private JobGraph createJobGraph(int i) throws IOException {
        boolean unused = BlockingOperator.isBlocking = true;
        JobVertex jobVertex = new JobVertex("blocking operator");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(BlockingOperator.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Duration.ofSeconds(10L).toMillis()));
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setExecutionConfig(executionConfig).build();
    }
}
