package org.apache.flink.runtime.dispatcher.runner;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.class */
public class ZooKeeperDefaultDispatcherRunnerTest extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDefaultDispatcherRunnerTest.class);
    private static final Time TESTING_TIMEOUT = Time.seconds(10);

    @ClassRule
    public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @ClassRule
    public static TestingRpcServiceResource testingRpcServiceResource = new TestingRpcServiceResource();
    private BlobServer blobServer;
    private TestingFatalErrorHandler fatalErrorHandler;
    private File clusterHaStorageDir;
    private Configuration configuration;

    @Before
    public void setup() throws IOException {
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.configuration = new Configuration();
        this.configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        this.configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
        this.clusterHaStorageDir = new File(HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(this.configuration).toString());
        this.blobServer = new BlobServer(this.configuration, temporaryFolder.newFolder(), BlobUtils.createBlobStoreFromConfig(this.configuration));
    }

    @After
    public void teardown() throws Exception {
        if (this.blobServer != null) {
            this.blobServer.close();
        }
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @Test
    public void testResourceCleanupUnderLeadershipChange() throws Exception {
        TestingRpcService testingRpcService = testingRpcServiceResource.getTestingRpcService();
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        final CuratorFramework asCuratorFramework = ZooKeeperUtils.startCuratorFramework(this.configuration, this.fatalErrorHandler).asCuratorFramework();
        TestingHighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setDispatcherLeaderElectionService(testingLeaderElectionService).setJobMasterLeaderRetrieverFunction(jobID -> {
            return ZooKeeperUtils.createLeaderRetrievalService(asCuratorFramework);
        }).build();
        Throwable th = null;
        try {
            DispatcherRunner createDispatcherRunner = createDispatcherRunner(testingRpcService, testingLeaderElectionService, new JobPersistenceComponentFactory() { // from class: org.apache.flink.runtime.dispatcher.runner.ZooKeeperDefaultDispatcherRunnerTest.1
                public JobGraphStore createJobGraphStore() {
                    return ZooKeeperDefaultDispatcherRunnerTest.this.createZooKeeperJobGraphStore(asCuratorFramework);
                }

                public JobResultStore createJobResultStore() {
                    return new EmbeddedJobResultStore();
                }
            }, new PartialDispatcherServices(this.configuration, build, CompletableFuture::new, this.blobServer, new TestingHeartbeatServices(), UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup, new MemoryExecutionGraphInfoStore(), this.fatalErrorHandler, VoidHistoryServerArchivist.INSTANCE, (String) null, ForkJoinPool.commonPool(), new DispatcherOperationCaches()), DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFactory.INSTANCE));
            Throwable th2 = null;
            try {
                try {
                    DispatcherGateway grantLeadership = grantLeadership(testingLeaderElectionService);
                    JobGraph createJobGraphWithBlobs = createJobGraphWithBlobs();
                    LOG.info("Initial job submission {}.", createJobGraphWithBlobs.getJobID());
                    grantLeadership.submitJob(createJobGraphWithBlobs, TESTING_TIMEOUT).get();
                    testingLeaderElectionService.notLeader();
                    LOG.info("Re-grant leadership first time.");
                    DispatcherGateway grantLeadership2 = grantLeadership(testingLeaderElectionService);
                    LOG.info("Cancel recovered job {}.", createJobGraphWithBlobs.getJobID());
                    CompletableFuture requestJobResult = grantLeadership2.requestJobResult(createJobGraphWithBlobs.getJobID(), TESTING_TIMEOUT);
                    grantLeadership2.cancelJob(createJobGraphWithBlobs.getJobID(), TESTING_TIMEOUT).get();
                    Assert.assertThat(((JobResult) requestJobResult.get()).getApplicationStatus(), Matchers.is(ApplicationStatus.CANCELED));
                    testingLeaderElectionService.notLeader();
                    JobGraphStore createZooKeeperJobGraphStore = createZooKeeperJobGraphStore(asCuratorFramework);
                    CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
                        return Boolean.valueOf(createZooKeeperJobGraphStore.getJobIds().isEmpty());
                    }, 20L);
                    if (createDispatcherRunner != null) {
                        if (0 != 0) {
                            try {
                                createDispatcherRunner.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createDispatcherRunner.close();
                        }
                    }
                    Assert.assertThat(this.clusterHaStorageDir.listFiles(), Matchers.is(Matchers.emptyArray()));
                } finally {
                }
            } catch (Throwable th4) {
                if (createDispatcherRunner != null) {
                    if (th2 != null) {
                        try {
                            createDispatcherRunner.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createDispatcherRunner.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    build.close();
                }
            }
        }
    }

    private DispatcherRunner createDispatcherRunner(TestingRpcService testingRpcService, TestingLeaderElectionService testingLeaderElectionService, JobPersistenceComponentFactory jobPersistenceComponentFactory, PartialDispatcherServices partialDispatcherServices, DispatcherRunnerFactory dispatcherRunnerFactory) throws Exception {
        return dispatcherRunnerFactory.createDispatcherRunner(testingLeaderElectionService, this.fatalErrorHandler, jobPersistenceComponentFactory, EXECUTOR_RESOURCE.getExecutor(), testingRpcService, partialDispatcherServices);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public JobGraphStore createZooKeeperJobGraphStore(CuratorFramework curatorFramework) {
        try {
            return ZooKeeperUtils.createJobGraphs(curatorFramework, this.configuration);
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
            return null;
        }
    }

    private DispatcherGateway grantLeadership(TestingLeaderElectionService testingLeaderElectionService) throws InterruptedException, ExecutionException {
        UUID randomUUID = UUID.randomUUID();
        testingLeaderElectionService.isLeader(randomUUID);
        return (DispatcherGateway) testingRpcServiceResource.getTestingRpcService().connect(testingLeaderElectionService.getConfirmationFuture().get().getAddress(), DispatcherId.fromUuid(randomUUID), DispatcherGateway.class).get();
    }

    private JobGraph createJobGraphWithBlobs() throws IOException {
        JobVertex jobVertex = new JobVertex("test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        streamingJobGraph.addUserJarBlobKey(this.blobServer.putPermanent(streamingJobGraph.getJobID(), new byte[256]));
        return streamingJobGraph;
    }
}
