package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceAllocationStrategy;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.jupiter.api.extension.RegisterExtension;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.class */
public abstract class FineGrainedSlotManagerTestBase {
    private static final long FUTURE_TIMEOUT_SECOND = 5;
    private static final long FUTURE_EXPECT_TIMEOUT_MS = 50;
    static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    static final FlinkException TEST_EXCEPTION = new FlinkException("Test exception");
    static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC = new WorkerResourceSpec.Builder().setCpuCores(10.0d).setTaskHeapMemoryMB(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS).setTaskOffHeapMemoryMB(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS).setNetworkMemoryMB(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS).setManagedMemoryMB(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS).build();
    static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE = SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
    static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE = SlotManagerUtils.generateDefaultSlotResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC, 2);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase$Context.class */
    public class Context {
        private FineGrainedSlotManager slotManager;
        private final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        private final ResourceTracker resourceTracker = new DefaultResourceTracker();
        private final TaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        private final SlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(Duration.ofSeconds(10));
        private SlotManagerMetricGroup slotManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
        private BlockedTaskManagerChecker blockedTaskManagerChecker = resourceID -> {
            return false;
        };
        private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService) FineGrainedSlotManagerTestBase.EXECUTOR_RESOURCE.getExecutor());
        private final Executor mainThreadExecutor = FineGrainedSlotManagerTestBase.EXECUTOR_RESOURCE.getExecutor();
        final TestingResourceAllocationStrategy.Builder resourceAllocationStrategyBuilder = TestingResourceAllocationStrategy.newBuilder();
        final TestingResourceAllocatorBuilder resourceAllocatorBuilder = new TestingResourceAllocatorBuilder();
        final TestingResourceEventListenerBuilder resourceEventListenerBuilder = new TestingResourceEventListenerBuilder();
        final SlotManagerConfigurationBuilder slotManagerConfigurationBuilder = SlotManagerConfigurationBuilder.newBuilder();

        /* JADX INFO: Access modifiers changed from: protected */
        public Context() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public FineGrainedSlotManager getSlotManager() {
            return this.slotManager;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ResourceTracker getResourceTracker() {
            return this.resourceTracker;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public TaskManagerTracker getTaskManagerTracker() {
            return this.taskManagerTracker;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ResourceManagerId getResourceManagerId() {
            return this.resourceManagerId;
        }

        public void setSlotManagerMetricGroup(SlotManagerMetricGroup slotManagerMetricGroup) {
            this.slotManagerMetricGroup = slotManagerMetricGroup;
        }

        public void setBlockedTaskManagerChecker(BlockedTaskManagerChecker blockedTaskManagerChecker) {
            this.blockedTaskManagerChecker = blockedTaskManagerChecker;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void runInMainThread(Runnable runnable) {
            this.mainThreadExecutor.execute(runnable);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void runInMainThreadAndWait(Runnable runnable) throws InterruptedException {
            OneShotLatch oneShotLatch = new OneShotLatch();
            this.mainThreadExecutor.execute(() -> {
                runnable.run();
                oneShotLatch.trigger();
            });
            oneShotLatch.await();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final void runTest(RunnableWithException runnableWithException) throws Exception {
            SlotManagerConfiguration build = this.slotManagerConfigurationBuilder.build();
            this.slotManager = FineGrainedSlotManagerBuilder.newBuilder(this.scheduledExecutor).setSlotManagerConfiguration(build).setSlotManagerMetricGroup(this.slotManagerMetricGroup).setResourceTracker(this.resourceTracker).setTaskManagerTracker(this.taskManagerTracker).setSlotStatusSyncer(this.slotStatusSyncer).setResourceAllocationStrategy(FineGrainedSlotManagerTestBase.this.getResourceAllocationStrategy(build).orElse(this.resourceAllocationStrategyBuilder.build())).build();
            runInMainThreadAndWait(() -> {
                this.slotManager.start(this.resourceManagerId, this.mainThreadExecutor, this.resourceAllocatorBuilder.build(), this.resourceEventListenerBuilder.build(), this.blockedTaskManagerChecker);
            });
            runnableWithException.run();
            CompletableFuture completableFuture = new CompletableFuture();
            runInMainThread(() -> {
                try {
                    this.slotManager.close();
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
                completableFuture.complete(null);
            });
            FutureUtils.assertNoException(completableFuture);
        }
    }

    protected abstract Optional<ResourceAllocationStrategy> getResourceAllocationStrategy(SlotManagerConfiguration slotManagerConfiguration);

    /* JADX INFO: Access modifiers changed from: package-private */
    public static SlotStatus createAllocatedSlotStatus(JobID jobID, AllocationID allocationID, ResourceProfile resourceProfile) {
        return new SlotStatus(new SlotID(ResourceID.generate(), 0), resourceProfile, jobID, allocationID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int getTotalResourceCount(Collection<ResourceRequirement> collection) {
        if (collection == null) {
            return 0;
        }
        return ((Integer) collection.stream().map((v0) -> {
            return v0.getNumberOfRequiredSlots();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ResourceRequirements createResourceRequirementsForSingleSlot() {
        return createResourceRequirementsForSingleSlot(new JobID());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ResourceRequirements createResourceRequirementsForSingleSlot(JobID jobID) {
        return createResourceRequirements(jobID, 1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ResourceRequirements createResourceRequirements(JobID jobID, int i) {
        return createResourceRequirements(jobID, i, ResourceProfile.UNKNOWN);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ResourceRequirements createResourceRequirements(JobID jobID, int i, ResourceProfile resourceProfile) {
        return ResourceRequirements.create(jobID, "foobar", Collections.singleton(ResourceRequirement.create(resourceProfile, i)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TaskExecutorConnection createTaskExecutorConnection() {
        return new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TaskExecutorConnection createTaskExecutorConnection(TestingTaskExecutorGateway testingTaskExecutorGateway) {
        return new TaskExecutorConnection(ResourceID.generate(), testingTaskExecutorGateway);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> T assertFutureCompleteAndReturn(CompletableFuture<T> completableFuture) throws Exception {
        return completableFuture.get(FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void assertFutureNotComplete(CompletableFuture<?> completableFuture) {
        FlinkAssertions.assertThatFuture(completableFuture).withFailMessage("Expected to fail with a timeout.", new Object[0]).failsWithin(50L, TimeUnit.MILLISECONDS).withThrowableOfType(TimeoutException.class);
    }
}
