package org.apache.flink.runtime.resourcemanager.active;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blocklist.BlockedNodeRetriever;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.active.TestingResourceEventHandler;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase.class */
public abstract class ResourceManagerDriverTestBase<WorkerType extends ResourceIDRetrievable> {
    protected static final long TIMEOUT_SEC = 5;
    protected static final long TIMEOUT_SHOULD_NOT_HAPPEN_MS = 10;
    private static final String MAIN_THREAD_NAME = "testing-rpc-main-thread";
    protected static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(new Configuration(), WorkerResourceSpec.ZERO);
    private static final ScheduledExecutor MAIN_THREAD_EXECUTOR = new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor(runnable -> {
        return new Thread(runnable, MAIN_THREAD_NAME);
    }));

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase$Context.class */
    public abstract class Context {
        private ResourceManagerDriver<WorkerType> driver;
        private ScheduledExecutor mainThreadExecutor;
        protected final Configuration flinkConfig = new Configuration();
        protected final TestingResourceEventHandler.Builder<WorkerType> resourceEventHandlerBuilder = TestingResourceEventHandler.builder();
        private BlockedNodeRetriever blockedNodeRetriever = () -> {
            return Collections.emptySet();
        };

        protected Context() {
        }

        protected ResourceManagerDriver<WorkerType> getDriver() {
            return this.driver;
        }

        public void setBlockedNodeRetriever(BlockedNodeRetriever blockedNodeRetriever) {
            this.blockedNodeRetriever = blockedNodeRetriever;
        }

        protected final void runTest(RunnableWithException runnableWithException) throws Exception {
            prepareRunTest();
            this.driver = createResourceManagerDriver();
            this.mainThreadExecutor = ResourceManagerDriverTestBase.MAIN_THREAD_EXECUTOR;
            this.driver.initialize(this.resourceEventHandlerBuilder.build(), this.mainThreadExecutor, ForkJoinPool.commonPool(), this.blockedNodeRetriever);
            runnableWithException.run();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final CompletableFuture<Void> runInMainThread(RunnableWithException runnableWithException) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.mainThreadExecutor.execute(() -> {
                try {
                    runnableWithException.run();
                    completableFuture.complete(null);
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
            return completableFuture;
        }

        protected final <T> CompletableFuture<T> runInMainThread(Supplier<T> supplier) {
            return CompletableFuture.supplyAsync(supplier, this.mainThreadExecutor);
        }

        protected final void validateInMainThread() {
            Assertions.assertThat(Thread.currentThread().getName()).isEqualTo(ResourceManagerDriverTestBase.MAIN_THREAD_NAME);
        }

        protected abstract void prepareRunTest() throws Exception;

        protected abstract ResourceManagerDriver<WorkerType> createResourceManagerDriver();

        protected abstract void preparePreviousAttemptWorkers();

        protected abstract void validateInitialization() throws Exception;

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract void validateWorkersRecoveredFromPreviousAttempt(Collection<WorkerType> collection);

        protected abstract void validateTermination() throws Exception;

        protected abstract void validateDeregisterApplication() throws Exception;

        protected abstract void validateRequestedResources(Collection<TaskExecutorProcessSpec> collection) throws Exception;

        protected abstract void validateReleaseResources(Collection<WorkerType> collection) throws Exception;
    }

    @Test
    void testInitialize() throws Exception {
        ResourceManagerDriverTestBase<WorkerType>.Context createContext = createContext();
        Objects.requireNonNull(createContext);
        createContext.runTest(createContext::validateInitialization);
    }

    @Test
    void testRecoverPreviousAttemptWorkers() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        ResourceManagerDriverTestBase<WorkerType>.Context createContext = createContext();
        TestingResourceEventHandler.Builder<WorkerType> builder = createContext.resourceEventHandlerBuilder;
        Objects.requireNonNull(completableFuture);
        builder.setOnPreviousAttemptWorkersRecoveredConsumer((v1) -> {
            r1.complete(v1);
        });
        createContext.preparePreviousAttemptWorkers();
        createContext.runTest(() -> {
            createContext.validateWorkersRecoveredFromPreviousAttempt((Collection) completableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS));
        });
    }

    @Test
    void testTerminate() throws Exception {
        ResourceManagerDriverTestBase<WorkerType>.Context createContext = createContext();
        createContext.runTest(() -> {
            createContext.getDriver().terminate();
            createContext.validateTermination();
        });
    }

    @Test
    void testDeregisterApplicationSucceeded() throws Exception {
        testDeregisterApplication(ApplicationStatus.SUCCEEDED);
    }

    @Test
    void testDeregisterApplicationFailed() throws Exception {
        testDeregisterApplication(ApplicationStatus.FAILED);
    }

    @Test
    void testDeregisterApplicationCanceled() throws Exception {
        testDeregisterApplication(ApplicationStatus.CANCELED);
    }

    @Test
    void testDeregisterApplicationUnknown() throws Exception {
        testDeregisterApplication(ApplicationStatus.UNKNOWN);
    }

    private void testDeregisterApplication(ApplicationStatus applicationStatus) throws Exception {
        ResourceManagerDriverTestBase<WorkerType>.Context createContext = createContext();
        createContext.runTest(() -> {
            createContext.getDriver().deregisterApplication(applicationStatus, (String) null);
            createContext.validateDeregisterApplication();
        });
    }

    @Test
    void testRequestResource() throws Exception {
        ResourceManagerDriverTestBase<WorkerType>.Context createContext = createContext();
        createContext.runTest(() -> {
            createContext.runInMainThread(() -> {
                return createContext.getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC);
            });
            createContext.validateRequestedResources(Collections.singleton(TASK_EXECUTOR_PROCESS_SPEC));
        });
    }

    @Test
    void testReleaseResource() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        ResourceManagerDriverTestBase<WorkerType>.Context createContext = createContext();
        createContext.runTest(() -> {
            createContext.runInMainThread(() -> {
                CompletableFuture requestResource = createContext.getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC);
                Objects.requireNonNull(completableFuture);
                return requestResource.thenAccept((v1) -> {
                    r1.complete(v1);
                });
            });
            completableFuture.thenApply(resourceIDRetrievable -> {
                return createContext.runInMainThread(() -> {
                    createContext.getDriver().releaseResource(resourceIDRetrievable);
                    completableFuture2.complete(resourceIDRetrievable);
                });
            });
            createContext.validateReleaseResources(Collections.singleton((ResourceIDRetrievable) completableFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS)));
        });
    }

    protected abstract ResourceManagerDriverTestBase<WorkerType>.Context createContext();
}
