package org.apache.flink.runtime.jobmanager;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.class */
public class DefaultExecutionPlanStoreTest extends TestLogger {
    private final ExecutionPlan testingExecutionPlan = JobGraphTestUtils.emptyJobGraph();
    private final long timeout = 100;
    private TestingStateHandleStore.Builder<ExecutionPlan> builder;
    private TestingRetrievableStateStorageHelper<ExecutionPlan> jobGraphStorageHelper;
    private TestingExecutionPlanStoreWatcher testingExecutionPlanStoreWatcher;
    private TestingExecutionPlanListener testingExecutionPlanListener;

    @Before
    public void setup() {
        this.builder = TestingStateHandleStore.newBuilder();
        this.testingExecutionPlanStoreWatcher = new TestingExecutionPlanStoreWatcher();
        this.testingExecutionPlanListener = new TestingExecutionPlanListener();
        this.jobGraphStorageHelper = new TestingRetrievableStateStorageHelper<>();
    }

    @After
    public void teardown() {
        if (this.testingExecutionPlanStoreWatcher != null) {
            this.testingExecutionPlanStoreWatcher.stop();
        }
    }

    @Test
    public void testRecoverExecutionPlan() throws Exception {
        RetrievableStateHandle<ExecutionPlan> store = this.jobGraphStorageHelper.store(this.testingExecutionPlan);
        ExecutionPlan recoverExecutionPlan = createAndStartExecutionPlanStore(this.builder.setGetFunction(str -> {
            return store;
        }).build()).recoverExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(recoverExecutionPlan).isNotNull();
        Assertions.assertThat(recoverExecutionPlan.getJobID()).isEqualTo(this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testRecoverExecutionPlanWhenNotExist() throws Exception {
        Assertions.assertThat(createAndStartExecutionPlanStore(this.builder.setGetFunction(str -> {
            throw new StateHandleStore.NotExistException("Not exist exception.");
        }).build()).recoverExecutionPlan(this.testingExecutionPlan.getJobID())).isNull();
    }

    @Test
    public void testRecoverExecutionPlanFailedShouldReleaseHandle() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        FlinkException flinkException = new FlinkException("Test exception.");
        TestingStateHandleStore.Builder<ExecutionPlan> getFunction = this.builder.setGetFunction(str -> {
            throw flinkException;
        });
        Objects.requireNonNull(completableFuture);
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(getFunction.setReleaseConsumer((v1) -> {
            r1.complete(v1);
        }).build());
        Assertions.assertThatThrownBy(() -> {
            createAndStartExecutionPlanStore.recoverExecutionPlan(this.testingExecutionPlan.getJobID());
        }).hasCause(flinkException);
        Assertions.assertThat(this.testingExecutionPlan.getJobID()).hasToString((String) completableFuture.get(100L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testPutExecutionPlanWhenNotExist() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createAndStartExecutionPlanStore(this.builder.setExistsFunction(str -> {
            return IntegerResourceVersion.notExisting();
        }).setAddFunction((str2, executionPlan) -> {
            completableFuture.complete(executionPlan);
            return this.jobGraphStorageHelper.store(executionPlan);
        }).build()).putExecutionPlan(this.testingExecutionPlan);
        Assertions.assertThat(((ExecutionPlan) completableFuture.get(100L, TimeUnit.MILLISECONDS)).getJobID()).isEqualTo(this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testPutExecutionPlanWhenAlreadyExist() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingStateHandleStore.Builder<ExecutionPlan> addFunction = this.builder.setExistsFunction(str -> {
            if (atomicBoolean.get()) {
                return IntegerResourceVersion.valueOf(100);
            }
            atomicBoolean.set(true);
            return IntegerResourceVersion.notExisting();
        }).setAddFunction((str2, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        });
        Objects.requireNonNull(completableFuture);
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(addFunction.setReplaceConsumer((v1) -> {
            r1.complete(v1);
        }).build());
        createAndStartExecutionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        createAndStartExecutionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        Tuple3 tuple3 = (Tuple3) completableFuture.get(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((String) tuple3.f0).isEqualTo(this.testingExecutionPlan.getJobID().toString());
        Assertions.assertThat((IntegerResourceVersion) tuple3.f1).isEqualTo(IntegerResourceVersion.valueOf(100));
        Assertions.assertThat(((ExecutionPlan) tuple3.f2).getJobID()).isEqualTo(this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testGlobalCleanup() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(this.builder.setAddFunction((str, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        }).setRemoveFunction(str2 -> {
            return Boolean.valueOf(completableFuture.complete(JobID.fromHexString(str2)));
        }).build());
        createAndStartExecutionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        createAndStartExecutionPlanStore.globalCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).join();
        Assertions.assertThat((JobID) completableFuture.get(100L, TimeUnit.MILLISECONDS)).isEqualTo(this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testGlobalCleanupWithNonExistName() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createAndStartExecutionPlanStore(this.builder.setRemoveFunction(str -> {
            return Boolean.valueOf(completableFuture.complete(JobID.fromHexString(str)));
        }).build()).globalCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).join();
        Assertions.assertThat(completableFuture).isDone();
    }

    @Test
    public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception {
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(this.builder.setRemoveFunction(str -> {
            return false;
        }).build());
        Assertions.assertThatThrownBy(() -> {
            createAndStartExecutionPlanStore.globalCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).get();
        }).isInstanceOf(ExecutionException.class);
    }

    @Test
    public void testGetJobIds() throws Exception {
        List asList = Arrays.asList(new JobID(0L, 0L), new JobID(0L, 1L));
        Assertions.assertThat(createAndStartExecutionPlanStore(this.builder.setGetAllHandlesSupplier(() -> {
            return (Collection) asList.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toList());
        }).build()).getJobIds()).containsAll(asList);
    }

    @Test
    public void testOnAddedExecutionPlanShouldNotProcessKnownExecutionPlans() throws Exception {
        createAndStartExecutionPlanStore(this.builder.setAddFunction((str, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        }).build()).putExecutionPlan(this.testingExecutionPlan);
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
    }

    @Test
    public void testOnAddedExecutionPlanShouldOnlyProcessUnknownExecutionPlans() throws Exception {
        RetrievableStateHandle<ExecutionPlan> store = this.jobGraphStorageHelper.store(this.testingExecutionPlan);
        createAndStartExecutionPlanStore(this.builder.setGetFunction(str -> {
            return store;
        }).setAddFunction((str2, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        }).build()).recoverExecutionPlan(this.testingExecutionPlan.getJobID());
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(this.testingExecutionPlan.getJobID());
        JobID generate = JobID.generate();
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(generate);
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).hasSize(1);
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).contains(new JobID[]{generate});
    }

    @Test
    public void testOnRemovedExecutionPlanShouldOnlyProcessKnownExecutionPlans() throws Exception {
        createAndStartExecutionPlanStore(this.builder.setAddFunction((str, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        }).build()).putExecutionPlan(this.testingExecutionPlan);
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(JobID.generate());
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).hasSize(1);
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).contains(new JobID[]{this.testingExecutionPlan.getJobID()});
    }

    @Test
    public void testOnRemovedExecutionPlanShouldNotProcessUnknownExecutionPlans() throws Exception {
        createAndStartExecutionPlanStore(this.builder.setAddFunction((str, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        }).build());
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
    }

    @Test
    public void testOnAddedExecutionPlanIsIgnoredAfterBeingStop() throws Exception {
        createAndStartExecutionPlanStore(this.builder.setAddFunction((str, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        }).build()).stop();
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
    }

    @Test
    public void testOnRemovedExecutionPlanIsIgnoredAfterBeingStop() throws Exception {
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(this.builder.setAddFunction((str, executionPlan) -> {
            return this.jobGraphStorageHelper.store(executionPlan);
        }).build());
        createAndStartExecutionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        createAndStartExecutionPlanStore.stop();
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
    }

    @Test
    public void testStoppingExecutionPlanStoreShouldReleaseAllHandles() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createAndStartExecutionPlanStore(this.builder.setReleaseAllHandlesRunnable(() -> {
            completableFuture.complete(null);
        }).build()).stop();
        Assertions.assertThat(completableFuture).isDone();
    }

    @Test
    public void testLocalCleanupShouldReleaseHandle() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingStateHandleStore.Builder<ExecutionPlan> builder = this.builder;
        Objects.requireNonNull(completableFuture);
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(builder.setReleaseConsumer((v1) -> {
            r1.complete(v1);
        }).build());
        createAndStartExecutionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        createAndStartExecutionPlanStore.localCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).join();
        Assertions.assertThat(this.testingExecutionPlan.getJobID()).hasToString((String) completableFuture.get());
    }

    @Test
    public void testRecoverPersistedJobResourceRequirements() throws Exception {
        HashMap hashMap = new HashMap();
        TestingStateHandleStore<ExecutionPlan> build = this.builder.setAddFunction((str, executionPlan) -> {
            RetrievableStateHandle<ExecutionPlan> store = this.jobGraphStorageHelper.store(executionPlan);
            hashMap.put(str, store);
            return store;
        }).setGetFunction(str2 -> {
            RetrievableStateHandle retrievableStateHandle = (RetrievableStateHandle) hashMap.get(str2);
            if (retrievableStateHandle != null) {
                return retrievableStateHandle;
            }
            throw new StateHandleStore.NotExistException("Does not exist.");
        }).build();
        JobResourceRequirements build2 = JobResourceRequirements.newBuilder().setParallelismForJobVertex(new JobVertexID(), 1, 1).build();
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(build);
        createAndStartExecutionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        createAndStartExecutionPlanStore.putJobResourceRequirements(this.testingExecutionPlan.getJobID(), build2);
        assertStoredRequirementsAre(createAndStartExecutionPlanStore, this.testingExecutionPlan.getJobID(), build2);
        JobResourceRequirements build3 = JobResourceRequirements.newBuilder().setParallelismForJobVertex(new JobVertexID(), 1, 1).build();
        createAndStartExecutionPlanStore.putJobResourceRequirements(this.testingExecutionPlan.getJobID(), build3);
        assertStoredRequirementsAre(createAndStartExecutionPlanStore, this.testingExecutionPlan.getJobID(), build3);
    }

    private static void assertStoredRequirementsAre(ExecutionPlanStore executionPlanStore, JobID jobID, JobResourceRequirements jobResourceRequirements) throws Exception {
        Assertions.assertThat(JobResourceRequirements.readFromExecutionPlan((ExecutionPlan) Objects.requireNonNull(executionPlanStore.recoverExecutionPlan(jobID)))).get().isEqualTo(jobResourceRequirements);
    }

    @Test
    public void testPutJobResourceRequirementsOfNonExistentJob() throws Exception {
        ExecutionPlanStore createAndStartExecutionPlanStore = createAndStartExecutionPlanStore(this.builder.setGetFunction(str -> {
            throw new StateHandleStore.NotExistException("Does not exist.");
        }).build());
        Assertions.assertThatThrownBy(() -> {
            createAndStartExecutionPlanStore.putJobResourceRequirements(new JobID(), JobResourceRequirements.empty());
        }).isInstanceOf(NoSuchElementException.class);
    }

    private ExecutionPlanStore createAndStartExecutionPlanStore(TestingStateHandleStore<ExecutionPlan> testingStateHandleStore) throws Exception {
        DefaultExecutionPlanStore defaultExecutionPlanStore = new DefaultExecutionPlanStore(testingStateHandleStore, this.testingExecutionPlanStoreWatcher, new ExecutionPlanStoreUtil() { // from class: org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStoreTest.1
            public String jobIDToName(JobID jobID) {
                return jobID.toString();
            }

            public JobID nameToJobID(String str) {
                return JobID.fromHexString(str);
            }
        });
        defaultExecutionPlanStore.start(this.testingExecutionPlanListener);
        return defaultExecutionPlanStore;
    }
}
