package org.apache.flink.runtime.jobmanager;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.dispatcher.NoOpExecutionPlanListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableTypeAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperExecutionPlansStoreITCase.class */
public class ZooKeeperExecutionPlansStoreITCase extends TestLogger {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();

    @RegisterExtension
    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = new EachCallbackWrapper<>(this.zooKeeperExtension);

    @RegisterExtension
    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();
    private static final RetrievableStateStorageHelper<ExecutionPlan> localStateStorage = executionPlan -> {
        return new RetrievableStreamStateHandle(new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(executionPlan)));
    };

    @Test
    public void testPutAndRemoveExecutionPlan() throws Exception {
        ExecutionPlanStore createZooKeeperExecutionPlanStore = createZooKeeperExecutionPlanStore("/testPutAndRemoveExecutionPlan");
        try {
            ExecutionPlanStore.ExecutionPlanListener executionPlanListener = (ExecutionPlanStore.ExecutionPlanListener) Mockito.mock(ExecutionPlanStore.ExecutionPlanListener.class);
            createZooKeeperExecutionPlanStore.start(executionPlanListener);
            ExecutionPlan createExecutionPlan = createExecutionPlan(new JobID(), "JobName");
            Assertions.assertThat(createZooKeeperExecutionPlanStore.getJobIds()).isEmpty();
            createZooKeeperExecutionPlanStore.putExecutionPlan(createExecutionPlan);
            Collection jobIds = createZooKeeperExecutionPlanStore.getJobIds();
            Assertions.assertThat(jobIds).hasSize(1);
            verifyExecutionPlans(createExecutionPlan, createZooKeeperExecutionPlanStore.recoverExecutionPlan((JobID) jobIds.iterator().next()));
            ExecutionPlan createExecutionPlan2 = createExecutionPlan(createExecutionPlan.getJobID(), "Updated JobName");
            createZooKeeperExecutionPlanStore.putExecutionPlan(createExecutionPlan2);
            Collection jobIds2 = createZooKeeperExecutionPlanStore.getJobIds();
            Assertions.assertThat(jobIds2).hasSize(1);
            verifyExecutionPlans(createExecutionPlan2, createZooKeeperExecutionPlanStore.recoverExecutionPlan((JobID) jobIds2.iterator().next()));
            createZooKeeperExecutionPlanStore.globalCleanupAsync(createExecutionPlan2.getJobID(), Executors.directExecutor()).join();
            Assertions.assertThat(createZooKeeperExecutionPlanStore.getJobIds()).isEmpty();
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.atMost(1))).onAddedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.never())).onRemovedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            createZooKeeperExecutionPlanStore.globalCleanupAsync(createExecutionPlan2.getJobID(), Executors.directExecutor()).join();
            createZooKeeperExecutionPlanStore.stop();
        } catch (Throwable th) {
            createZooKeeperExecutionPlanStore.stop();
            throw th;
        }
    }

    @Nonnull
    private ExecutionPlanStore createZooKeeperExecutionPlanStore(String str) throws Exception {
        CuratorFramework zooKeeperClient = this.zooKeeperExtension.getZooKeeperClient(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
        zooKeeperClient.newNamespaceAwareEnsurePath(str).ensure(zooKeeperClient.getZookeeperClient());
        CuratorFramework usingNamespace = zooKeeperClient.usingNamespace(zooKeeperClient.getNamespace() + str);
        return new DefaultExecutionPlanStore(new ZooKeeperStateHandleStore(usingNamespace, localStateStorage), new ZooKeeperExecutionPlanStoreWatcher(new PathChildrenCache(usingNamespace, "/", false)), ZooKeeperExecutionPlanStoreUtil.INSTANCE);
    }

    @Test
    public void testRecoverExecutionPlans() throws Exception {
        ExecutionPlanStore createZooKeeperExecutionPlanStore = createZooKeeperExecutionPlanStore("/testRecoverExecutionPlans");
        try {
            ExecutionPlanStore.ExecutionPlanListener executionPlanListener = (ExecutionPlanStore.ExecutionPlanListener) Mockito.mock(ExecutionPlanStore.ExecutionPlanListener.class);
            createZooKeeperExecutionPlanStore.start(executionPlanListener);
            HashMap hashMap = new HashMap();
            JobID[] jobIDArr = {new JobID(), new JobID(), new JobID()};
            hashMap.put(jobIDArr[0], createExecutionPlan(jobIDArr[0]));
            hashMap.put(jobIDArr[1], createExecutionPlan(jobIDArr[1]));
            hashMap.put(jobIDArr[2], createExecutionPlan(jobIDArr[2]));
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                createZooKeeperExecutionPlanStore.putExecutionPlan((ExecutionPlan) it.next());
            }
            Collection jobIds = createZooKeeperExecutionPlanStore.getJobIds();
            Assertions.assertThat(jobIds).hasSameSizeAs(hashMap.entrySet());
            Iterator it2 = jobIds.iterator();
            while (it2.hasNext()) {
                ExecutionPlan recoverExecutionPlan = createZooKeeperExecutionPlanStore.recoverExecutionPlan((JobID) it2.next());
                Assertions.assertThat(hashMap).containsKey(recoverExecutionPlan.getJobID());
                verifyExecutionPlans((ExecutionPlan) hashMap.get(recoverExecutionPlan.getJobID()), recoverExecutionPlan);
                createZooKeeperExecutionPlanStore.globalCleanupAsync(recoverExecutionPlan.getJobID(), Executors.directExecutor()).join();
            }
            Assertions.assertThat(createZooKeeperExecutionPlanStore.getJobIds()).isEmpty();
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.atMost(hashMap.size()))).onAddedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.never())).onRemovedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            createZooKeeperExecutionPlanStore.stop();
        } catch (Throwable th) {
            createZooKeeperExecutionPlanStore.stop();
            throw th;
        }
    }

    @Test
    public void testConcurrentAddExecutionPlan() throws Exception {
        ExecutionPlanStore executionPlanStore = null;
        ExecutionPlanStore executionPlanStore2 = null;
        try {
            executionPlanStore = createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
            executionPlanStore2 = createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
            ExecutionPlan createExecutionPlan = createExecutionPlan(new JobID());
            ExecutionPlan createExecutionPlan2 = createExecutionPlan(new JobID());
            ExecutionPlanStore.ExecutionPlanListener executionPlanListener = (ExecutionPlanStore.ExecutionPlanListener) Mockito.mock(ExecutionPlanStore.ExecutionPlanListener.class);
            final Comparable[] comparableArr = new JobID[1];
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlansStoreITCase.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m214answer(InvocationOnMock invocationOnMock) throws Throwable {
                    comparableArr[0] = (JobID) invocationOnMock.getArguments()[0];
                    countDownLatch.countDown();
                    return null;
                }
            }).when(executionPlanListener)).onAddedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            executionPlanStore.start(executionPlanListener);
            executionPlanStore2.start(NoOpExecutionPlanListener.INSTANCE);
            executionPlanStore.putExecutionPlan(createExecutionPlan);
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.never())).onAddedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.never())).onRemovedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            executionPlanStore2.putExecutionPlan(createExecutionPlan2);
            countDownLatch.await();
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.times(1))).onAddedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener) Mockito.verify(executionPlanListener, Mockito.never())).onRemovedExecutionPlan((JobID) ArgumentMatchers.any(JobID.class));
            Assertions.assertThat(comparableArr[0]).isEqualTo(createExecutionPlan2.getJobID());
            if (executionPlanStore != null) {
                executionPlanStore.stop();
            }
            if (executionPlanStore2 != null) {
                executionPlanStore2.stop();
            }
        } catch (Throwable th) {
            if (executionPlanStore != null) {
                executionPlanStore.stop();
            }
            if (executionPlanStore2 != null) {
                executionPlanStore2.stop();
            }
            throw th;
        }
    }

    @Test
    public void testUpdateExecutionPlanYouDidNotGetOrAdd() throws Exception {
        ExecutionPlanStore createZooKeeperExecutionPlanStore = createZooKeeperExecutionPlanStore("/testUpdateExecutionPlanYouDidNotGetOrAdd");
        ExecutionPlanStore createZooKeeperExecutionPlanStore2 = createZooKeeperExecutionPlanStore("/testUpdateExecutionPlanYouDidNotGetOrAdd");
        createZooKeeperExecutionPlanStore.start(NoOpExecutionPlanListener.INSTANCE);
        createZooKeeperExecutionPlanStore2.start(NoOpExecutionPlanListener.INSTANCE);
        ExecutionPlan createExecutionPlan = createExecutionPlan(new JobID());
        createZooKeeperExecutionPlanStore.putExecutionPlan(createExecutionPlan);
        Assertions.assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> {
            createZooKeeperExecutionPlanStore2.putExecutionPlan(createExecutionPlan);
        });
    }

    @Test
    public void testExecutionPlanRemovalFailureAndLockRelease() throws Exception {
        ExecutionPlanStore createZooKeeperExecutionPlanStore = createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
        ExecutionPlanStore createZooKeeperExecutionPlanStore2 = createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
        TestingExecutionPlanListener testingExecutionPlanListener = new TestingExecutionPlanListener();
        createZooKeeperExecutionPlanStore.start(testingExecutionPlanListener);
        createZooKeeperExecutionPlanStore2.start(testingExecutionPlanListener);
        JobGraph emptyJobGraph = JobGraphTestUtils.emptyJobGraph();
        createZooKeeperExecutionPlanStore.putExecutionPlan(emptyJobGraph);
        ExecutionPlan recoverExecutionPlan = createZooKeeperExecutionPlanStore2.recoverExecutionPlan(emptyJobGraph.getJobID());
        Assertions.assertThat(recoverExecutionPlan).isNotNull();
        ((ThrowableTypeAssert) Assertions.assertThatExceptionOfType(Exception.class).as("It should not be possible to remove the ExecutionPlan since the first store still has a lock on it.", new Object[0])).isThrownBy(() -> {
            createZooKeeperExecutionPlanStore2.globalCleanupAsync(recoverExecutionPlan.getJobID(), Executors.directExecutor()).join();
        });
        createZooKeeperExecutionPlanStore.stop();
        createZooKeeperExecutionPlanStore2.globalCleanupAsync(recoverExecutionPlan.getJobID(), Executors.directExecutor()).join();
        Assertions.assertThat(createZooKeeperExecutionPlanStore2.recoverExecutionPlan(recoverExecutionPlan.getJobID())).isNull();
        createZooKeeperExecutionPlanStore2.stop();
    }

    private ExecutionPlan createExecutionPlan(JobID jobID) {
        return createExecutionPlan(jobID, "Test ExecutionPlan");
    }

    private ExecutionPlan createExecutionPlan(JobID jobID, String str) {
        JobVertex jobVertex = new JobVertex("Test JobVertex");
        jobVertex.setParallelism(1);
        return JobGraphBuilder.newStreamingJobGraphBuilder().setJobName(str).setJobId(jobID).addJobVertex(jobVertex).build();
    }

    private void verifyExecutionPlans(ExecutionPlan executionPlan, ExecutionPlan executionPlan2) {
        Assertions.assertThat(executionPlan2.getName()).isEqualTo(executionPlan.getName());
        Assertions.assertThat(executionPlan2.getJobID()).isEqualTo(executionPlan.getJobID());
    }
}
