package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceAllocationStrategy;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.class */
class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
    private static final ResourceProfile LARGE_SLOT_RESOURCE_PROFILE = DEFAULT_TOTAL_RESOURCE_PROFILE.multiply(2);
    private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE = LARGE_SLOT_RESOURCE_PROFILE.multiply(2);

    FineGrainedSlotManagerTest() {
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase
    protected Optional<ResourceAllocationStrategy> getResourceAllocationStrategy(SlotManagerConfiguration slotManagerConfiguration) {
        return Optional.empty();
    }

    @Test
    void testInitializeAndClose() throws Exception {
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.1
            {
                runTest(() -> {
                });
            }
        };
    }

    @Test
    void testCloseAfterSuspendDoesNotThrowException() throws Exception {
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.2
            {
                runTest(() -> {
                    getSlotManager().suspend();
                });
            }
        };
    }

    @Test
    void testTaskManagerRegistration() throws Exception {
        final TaskExecutorConnection createTaskExecutorConnection = createTaskExecutorConnection();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection;
                runTest(() -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    runInMainThread(() -> {
                        completableFuture.complete(getSlotManager().registerTaskManager(taskExecutorConnection, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat(getSlotManager().getNumberRegisteredSlots()).isEqualTo(2);
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManager(taskExecutorConnection.getInstanceID())).isPresent();
                    Assertions.assertThat(((TaskManagerInfo) getTaskManagerTracker().getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAvailableResource()).isEqualTo(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE);
                    Assertions.assertThat(((TaskManagerInfo) getTaskManagerTracker().getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getTotalResource()).isEqualTo(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE);
                });
            }
        };
    }

    @Test
    void testTaskManagerUnregistration() throws Exception {
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            return new CompletableFuture();
        }).createTestingTaskExecutorGateway());
        final AllocationID allocationID = new AllocationID();
        final SlotReport slotReport = new SlotReport(createAllocatedSlotStatus(new JobID(), allocationID, DEFAULT_SLOT_RESOURCE_PROFILE));
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.4
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                SlotReport slotReport2 = slotReport;
                AllocationID allocationID2 = allocationID;
                runTest(() -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    runInMainThread(() -> {
                        completableFuture.complete(getSlotManager().registerTaskManager(taskExecutorConnection2, slotReport2, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Optional allocatedOrPendingSlot = getTaskManagerTracker().getAllocatedOrPendingSlot(allocationID2);
                    Assertions.assertThat(allocatedOrPendingSlot).isPresent();
                    Assertions.assertThat(((TaskManagerSlotInformation) allocatedOrPendingSlot.get()).getState()).isSameAs(SlotState.ALLOCATED);
                    runInMainThread(() -> {
                        completableFuture2.complete(Boolean.valueOf(getSlotManager().unregisterTaskManager(taskExecutorConnection2.getInstanceID(), FineGrainedSlotManagerTestBase.TEST_EXCEPTION)));
                    });
                    Assertions.assertThat((Boolean) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2)).isTrue();
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManagers()).isEmpty();
                    Assertions.assertThat(getTaskManagerTracker().getAllocatedOrPendingSlot(allocationID2)).isNotPresent();
                });
            }
        };
    }

    @Test
    void testTaskManagerRegistrationDeductPendingTaskManager() throws Exception {
        final TaskExecutorConnection createTaskExecutorConnection = createTaskExecutorConnection();
        final TaskExecutorConnection createTaskExecutorConnection2 = createTaskExecutorConnection();
        final TaskExecutorConnection createTaskExecutorConnection3 = createTaskExecutorConnection();
        final SlotReport slotReport = new SlotReport(createAllocatedSlotStatus(new JobID(), new AllocationID(), DEFAULT_SLOT_RESOURCE_PROFILE));
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.5
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    Assertions.assertThat(map).hasSize(1);
                    JobID jobID = (JobID) map.keySet().stream().findFirst().get();
                    ResourceAllocationResult.Builder builder = ResourceAllocationResult.builder();
                    PendingTaskManager pendingTaskManager = getTaskManagerTracker().getPendingTaskManagers().isEmpty() ? new PendingTaskManager(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, 2) : (PendingTaskManager) getTaskManagerTracker().getPendingTaskManagers().iterator().next();
                    builder.addPendingTaskManagerAllocate(pendingTaskManager);
                    builder.addAllocationOnPendingResource(jobID, pendingTaskManager.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                    return builder.build();
                });
                this.slotManagerConfigurationBuilder.setRequirementCheckDelay(Duration.ZERO);
                TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection;
                SlotReport slotReport2 = slotReport;
                TaskExecutorConnection taskExecutorConnection2 = createTaskExecutorConnection2;
                TaskExecutorConnection taskExecutorConnection3 = createTaskExecutorConnection3;
                runTest(() -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    CompletableFuture completableFuture3 = new CompletableFuture();
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot());
                        completableFuture.complete(getSlotManager().registerTaskManager(taskExecutorConnection, slotReport2, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat(getTaskManagerTracker().getPendingTaskManagers()).hasSize(1);
                    runInMainThread(() -> {
                        completableFuture2.complete(getSlotManager().registerTaskManager(taskExecutorConnection2, new SlotReport(), FineGrainedSlotManagerTest.LARGE_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTest.LARGE_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat(getTaskManagerTracker().getPendingTaskManagers()).hasSize(1);
                    runInMainThread(() -> {
                        completableFuture3.complete(getSlotManager().registerTaskManager(taskExecutorConnection3, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture3)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat(getTaskManagerTracker().getPendingTaskManagers()).isEmpty();
                });
            }
        };
    }

    @Test
    void testReceivingUnknownSlotReport() throws Exception {
        final InstanceID instanceID = new InstanceID();
        final SlotReport slotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.6
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                InstanceID instanceID2 = instanceID;
                SlotReport slotReport2 = slotReport;
                runTest(() -> {
                    Assertions.assertThat(getSlotManager().getNumberRegisteredSlots()).isEqualTo(0);
                    CompletableFuture completableFuture = new CompletableFuture();
                    runInMainThread(() -> {
                        completableFuture.complete(Boolean.valueOf(getSlotManager().reportSlotStatus(instanceID2, slotReport2)));
                    });
                    Assertions.assertThat((Boolean) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture)).isFalse();
                    Assertions.assertThat(getSlotManager().getNumberRegisteredSlots()).isEqualTo(0);
                });
            }
        };
    }

    @Test
    void testSlotAllocationAccordingToStrategyResult() throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            completableFuture.complete(tuple6);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway());
        final JobID jobID = new JobID();
        final SlotReport slotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.7
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TestingResourceAllocationStrategy.Builder builder = this.resourceAllocationStrategyBuilder;
                JobID jobID2 = jobID;
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                builder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    return ResourceAllocationResult.builder().addAllocationOnRegisteredResource(jobID2, taskExecutorConnection2.getInstanceID(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build();
                });
                TaskExecutorConnection taskExecutorConnection3 = taskExecutorConnection;
                SlotReport slotReport2 = slotReport;
                JobID jobID3 = jobID;
                CompletableFuture completableFuture2 = completableFuture;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().registerTaskManager(taskExecutorConnection3, slotReport2, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID3, 1));
                    });
                    Tuple6 tuple62 = (Tuple6) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2);
                    Assertions.assertThat((JobID) tuple62.f1).isEqualTo(jobID3);
                    Assertions.assertThat((ResourceProfile) tuple62.f3).isEqualTo(FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                });
            }
        };
    }

    @Test
    void testRequestNewResourcesAccordingToStrategyResult() throws Exception {
        final JobID jobID = new JobID();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList arrayList = new ArrayList();
        arrayList.add(new CompletableFuture());
        arrayList.add(new CompletableFuture());
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.8
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                PendingTaskManager pendingTaskManager = new PendingTaskManager(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
                TestingResourceAllocatorBuilder testingResourceAllocatorBuilder = this.resourceAllocatorBuilder;
                AtomicInteger atomicInteger2 = atomicInteger;
                List list = arrayList;
                testingResourceAllocatorBuilder.setDeclareResourceNeededConsumer(collection -> {
                    Assertions.assertThat(atomicInteger2).hasValueLessThan(2);
                    if (collection.isEmpty()) {
                        return;
                    }
                    ((CompletableFuture) list.get(atomicInteger2.getAndIncrement())).complete(null);
                });
                TestingResourceAllocationStrategy.Builder builder = this.resourceAllocationStrategyBuilder;
                JobID jobID2 = jobID;
                builder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    return ResourceAllocationResult.builder().addPendingTaskManagerAllocate(pendingTaskManager).addAllocationOnPendingResource(jobID2, pendingTaskManager.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build();
                });
                JobID jobID3 = jobID;
                List list2 = arrayList;
                AtomicInteger atomicInteger3 = atomicInteger;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID3, 1));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture) list2.get(0));
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture) list2.get(1));
                    Assertions.assertThat(atomicInteger3).hasValue(1);
                });
            }
        };
    }

    @Test
    void testSlotAllocationForPendingTaskManagerWillBeRespected() throws Exception {
        final JobID jobID = new JobID();
        final CompletableFuture completableFuture = new CompletableFuture();
        final PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
        final CompletableFuture completableFuture2 = new CompletableFuture();
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            completableFuture2.complete(tuple6);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway());
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.9
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TestingResourceAllocationStrategy.Builder builder = this.resourceAllocationStrategyBuilder;
                PendingTaskManager pendingTaskManager2 = pendingTaskManager;
                JobID jobID2 = jobID;
                builder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    return ResourceAllocationResult.builder().addPendingTaskManagerAllocate(pendingTaskManager2).addAllocationOnPendingResource(jobID2, pendingTaskManager2.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build();
                });
                TestingResourceAllocatorBuilder testingResourceAllocatorBuilder = this.resourceAllocatorBuilder;
                CompletableFuture completableFuture3 = completableFuture;
                testingResourceAllocatorBuilder.setDeclareResourceNeededConsumer(collection -> {
                    if (collection.isEmpty()) {
                        return;
                    }
                    completableFuture3.complete(null);
                });
                JobID jobID3 = jobID;
                CompletableFuture completableFuture4 = completableFuture;
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                CompletableFuture completableFuture5 = completableFuture2;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID3, 1));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture4);
                    runInMainThread(() -> {
                        getSlotManager().registerTaskManager(taskExecutorConnection2, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                    });
                    Tuple6 tuple62 = (Tuple6) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture5);
                    Assertions.assertThat((JobID) tuple62.f1).isEqualTo(jobID3);
                    Assertions.assertThat((ResourceProfile) tuple62.f3).isEqualTo(FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                });
            }
        };
    }

    @Test
    void testNotificationAboutNotEnoughResources() throws Exception {
        testNotificationAboutNotEnoughResources(false);
    }

    @Test
    void testGracePeriodForNotificationAboutNotEnoughResources() throws Exception {
        testNotificationAboutNotEnoughResources(true);
    }

    private void testNotificationAboutNotEnoughResources(final boolean z) throws Exception {
        final JobID jobID = new JobID();
        final ArrayList arrayList = new ArrayList();
        final CompletableFuture completableFuture = new CompletableFuture();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.10
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TestingResourceEventListenerBuilder testingResourceEventListenerBuilder = this.resourceEventListenerBuilder;
                List list = arrayList;
                CompletableFuture completableFuture2 = completableFuture;
                testingResourceEventListenerBuilder.setNotEnoughResourceAvailableConsumer((jobID2, collection) -> {
                    list.add(Tuple2.of(jobID2, collection));
                    completableFuture2.complete(null);
                });
                TestingResourceAllocationStrategy.Builder builder = this.resourceAllocationStrategyBuilder;
                JobID jobID3 = jobID;
                builder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    return ResourceAllocationResult.builder().addUnfulfillableJob(jobID3).build();
                });
                boolean z2 = z;
                JobID jobID4 = jobID;
                CompletableFuture completableFuture3 = completableFuture;
                List list2 = arrayList;
                runTest(() -> {
                    if (z2) {
                        runInMainThread(() -> {
                            getSlotManager().setFailUnfulfillableRequest(false);
                        });
                    }
                    ResourceRequirements createResourceRequirements = FineGrainedSlotManagerTestBase.createResourceRequirements(jobID4, 1);
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(createResourceRequirements);
                    });
                    if (z2) {
                        FineGrainedSlotManagerTestBase.assertFutureNotComplete(completableFuture3);
                        Assertions.assertThat(list2).isEmpty();
                        runInMainThread(() -> {
                            getSlotManager().setFailUnfulfillableRequest(true);
                        });
                    }
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture3);
                    Assertions.assertThat(list2).hasSize(1);
                    Assertions.assertThat((JobID) ((Tuple2) list2.get(0)).f0).isEqualTo(jobID4);
                });
            }
        };
    }

    @Test
    void testRequirementCheckOnlyTriggeredOnce() throws Exception {
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.11
            {
                ArrayList arrayList = new ArrayList();
                arrayList.add(new CompletableFuture());
                arrayList.add(new CompletableFuture());
                Duration ofMillis = Duration.ofMillis(50L);
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    if (((CompletableFuture) arrayList.get(0)).isDone()) {
                        ((CompletableFuture) arrayList.get(1)).complete(null);
                    } else {
                        ((CompletableFuture) arrayList.get(0)).complete(null);
                    }
                    return ResourceAllocationResult.builder().build();
                });
                this.slotManagerConfigurationBuilder.setRequirementCheckDelay(ofMillis);
                runTest(() -> {
                    ResourceRequirements createResourceRequirementsForSingleSlot = FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot();
                    ResourceRequirements createResourceRequirementsForSingleSlot2 = FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot();
                    ResourceRequirements createResourceRequirementsForSingleSlot3 = FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot();
                    TaskExecutorConnection createTaskExecutorConnection = FineGrainedSlotManagerTestBase.createTaskExecutorConnection();
                    CompletableFuture completableFuture = new CompletableFuture();
                    long nanoTime = System.nanoTime();
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(createResourceRequirementsForSingleSlot);
                        getSlotManager().processResourceRequirements(createResourceRequirementsForSingleSlot2);
                        getSlotManager().registerTaskManager(createTaskExecutorConnection, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        completableFuture.complete(null);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture);
                    ((AbstractBooleanAssert) Assumptions.assumeThat((System.nanoTime() - nanoTime) / 1000000 < ofMillis.toMillis()).as("The time of process requirement and register task manager must not take longer than the requirement check delay. If it does, then this indicates a very slow machine.", new Object[0])).isTrue();
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture) arrayList.get(0));
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture) arrayList.get(1));
                    Thread.sleep(ofMillis.toMillis() * 2);
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture) arrayList.get(1));
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(createResourceRequirementsForSingleSlot3);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture) arrayList.get(1));
                });
            }
        };
    }

    @Test
    void testMaxTotalResourceCpuExceeded() throws Exception {
        Consumer<SlotManagerConfigurationBuilder> consumer = slotManagerConfigurationBuilder -> {
            slotManagerConfigurationBuilder.setMaxTotalCpu((CPUResource) DEFAULT_TOTAL_RESOURCE_PROFILE.getCpuCores().multiply(BigDecimal.valueOf(1.5d)));
        };
        testMaxTotalResourceExceededAllocateResource(consumer);
        testMaxTotalResourceExceededRegisterResource(consumer);
    }

    @Test
    void testGetResourceOverview() throws Exception {
        final TaskExecutorConnection createTaskExecutorConnection = createTaskExecutorConnection();
        final TaskExecutorConnection createTaskExecutorConnection2 = createTaskExecutorConnection();
        ResourceID generate = ResourceID.generate();
        ResourceID generate2 = ResourceID.generate();
        SlotID slotID = new SlotID(generate, 0);
        SlotID slotID2 = new SlotID(generate2, 0);
        final ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 10);
        final ResourceProfile fromResources2 = ResourceProfile.fromResources(2.0d, 20);
        SlotStatus slotStatus = new SlotStatus(slotID, fromResources, new JobID(), new AllocationID());
        SlotStatus slotStatus2 = new SlotStatus(slotID2, fromResources2, new JobID(), new AllocationID());
        final SlotReport slotReport = new SlotReport(slotStatus);
        final SlotReport slotReport2 = new SlotReport(slotStatus2);
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.12
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection;
                SlotReport slotReport3 = slotReport;
                ResourceProfile resourceProfile = fromResources;
                TaskExecutorConnection taskExecutorConnection2 = createTaskExecutorConnection2;
                SlotReport slotReport4 = slotReport2;
                ResourceProfile resourceProfile2 = fromResources2;
                runTest(() -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    runInMainThread(() -> {
                        completableFuture.complete(getSlotManager().registerTaskManager(taskExecutorConnection, slotReport3, resourceProfile.multiply(2), resourceProfile));
                        completableFuture2.complete(getSlotManager().registerTaskManager(taskExecutorConnection2, slotReport4, resourceProfile2.multiply(2), resourceProfile2));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat(getSlotManager().getFreeResource()).isEqualTo(resourceProfile.merge(resourceProfile2));
                    Assertions.assertThat(getSlotManager().getFreeResourceOf(taskExecutorConnection.getInstanceID())).isEqualTo(resourceProfile);
                    Assertions.assertThat(getSlotManager().getFreeResourceOf(taskExecutorConnection2.getInstanceID())).isEqualTo(resourceProfile2);
                    Assertions.assertThat(getSlotManager().getRegisteredResource()).isEqualTo(resourceProfile.merge(resourceProfile2).multiply(2));
                    Assertions.assertThat(getSlotManager().getRegisteredResourceOf(taskExecutorConnection.getInstanceID())).isEqualTo(resourceProfile.multiply(2));
                    Assertions.assertThat(getSlotManager().getRegisteredResourceOf(taskExecutorConnection2.getInstanceID())).isEqualTo(resourceProfile2.multiply(2));
                });
            }
        };
    }

    @Test
    void testMaxTotalResourceMemoryExceeded() throws Exception {
        Consumer<SlotManagerConfigurationBuilder> consumer = slotManagerConfigurationBuilder -> {
            slotManagerConfigurationBuilder.setMaxTotalMem(DEFAULT_TOTAL_RESOURCE_PROFILE.getTotalMemory().multiply(1.5d));
        };
        testMaxTotalResourceExceededAllocateResource(consumer);
        testMaxTotalResourceExceededRegisterResource(consumer);
    }

    private void testMaxTotalResourceExceededAllocateResource(final Consumer<SlotManagerConfigurationBuilder> consumer) throws Exception {
        final JobID jobID = new JobID();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList arrayList = new ArrayList();
        arrayList.add(new CompletableFuture());
        arrayList.add(new CompletableFuture());
        final PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
        final PendingTaskManager pendingTaskManager2 = new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.13
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                consumer.accept(this.slotManagerConfigurationBuilder);
                TestingResourceAllocatorBuilder testingResourceAllocatorBuilder = this.resourceAllocatorBuilder;
                AtomicInteger atomicInteger2 = atomicInteger;
                List list = arrayList;
                testingResourceAllocatorBuilder.setDeclareResourceNeededConsumer(collection -> {
                    if (collection.isEmpty()) {
                        return;
                    }
                    Assertions.assertThat(atomicInteger2).hasValueLessThan(2);
                    ((CompletableFuture) list.get(atomicInteger2.getAndIncrement())).complete(null);
                });
                TestingResourceAllocationStrategy.Builder builder = this.resourceAllocationStrategyBuilder;
                PendingTaskManager pendingTaskManager3 = pendingTaskManager;
                PendingTaskManager pendingTaskManager4 = pendingTaskManager2;
                JobID jobID2 = jobID;
                builder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    return ResourceAllocationResult.builder().addPendingTaskManagerAllocate(pendingTaskManager3).addPendingTaskManagerAllocate(pendingTaskManager4).addAllocationOnPendingResource(jobID2, pendingTaskManager3.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).addAllocationOnPendingResource(jobID2, pendingTaskManager4.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build();
                });
                JobID jobID3 = jobID;
                List list2 = arrayList;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID3, 2));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture) list2.get(0));
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture) list2.get(1));
                });
            }
        };
    }

    private void testMaxTotalResourceExceededRegisterResource(final Consumer<SlotManagerConfigurationBuilder> consumer) throws Exception {
        final TaskExecutorConnection createTaskExecutorConnection = createTaskExecutorConnection();
        final TaskExecutorConnection createTaskExecutorConnection2 = createTaskExecutorConnection();
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.14
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                consumer.accept(this.slotManagerConfigurationBuilder);
                CompletableFuture completableFuture3 = completableFuture;
                TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection;
                CompletableFuture completableFuture4 = completableFuture2;
                TaskExecutorConnection taskExecutorConnection2 = createTaskExecutorConnection2;
                runTest(() -> {
                    runInMainThread(() -> {
                        completableFuture3.complete(getSlotManager().registerTaskManager(taskExecutorConnection, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture3)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManager(taskExecutorConnection.getInstanceID())).isPresent();
                    runInMainThread(() -> {
                        completableFuture4.complete(getSlotManager().registerTaskManager(taskExecutorConnection2, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture4)).isEqualTo(SlotManager.RegistrationResult.REJECTED);
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Assertions.assertThat(getTaskManagerTracker().getRegisteredTaskManager(taskExecutorConnection2.getInstanceID())).isNotPresent();
                });
            }
        };
    }

    @Test
    void testMetricsUnregisteredWhenSuspending() throws Exception {
        testAccessMetricValueDuringItsUnregister((v0) -> {
            v0.suspend();
        });
    }

    @Test
    void testMetricsUnregisteredWhenClosing() throws Exception {
        testAccessMetricValueDuringItsUnregister((v0) -> {
            v0.close();
        });
    }

    private void testAccessMetricValueDuringItsUnregister(ThrowingConsumer<SlotManager, Exception> throwingConsumer) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        TestingMetricRegistry build = TestingMetricRegistry.builder().setRegisterConsumer((metric, str, abstractMetricGroup) -> {
            atomicInteger.incrementAndGet();
        }).setUnregisterConsumer((metric2, str2, abstractMetricGroup2) -> {
            atomicInteger.decrementAndGet();
        }).build();
        FineGrainedSlotManagerTestBase.Context context = new FineGrainedSlotManagerTestBase.Context();
        context.setSlotManagerMetricGroup(SlotManagerMetricGroup.create(build, "localhost"));
        context.runTest(() -> {
            Assertions.assertThat(atomicInteger).hasValueGreaterThan(0);
            context.runInMainThreadAndWait(() -> {
                Assertions.assertThatNoException().isThrownBy(() -> {
                    throwingConsumer.accept(context.getSlotManager());
                });
            });
            Assertions.assertThat(atomicInteger).hasValue(0);
        });
    }

    @Test
    void testReclaimInactiveSlotsOnClearRequirements() throws Exception {
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.15
            {
                runTest(() -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    JobID jobID = new JobID();
                    TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
                    Objects.requireNonNull(completableFuture);
                    TaskExecutorConnection createTaskExecutorConnection = FineGrainedSlotManagerTestBase.createTaskExecutorConnection(testingTaskExecutorGatewayBuilder.setFreeInactiveSlotsConsumer((v1) -> {
                        r1.complete(v1);
                    }).createTestingTaskExecutorGateway());
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    runInMainThread(() -> {
                        completableFuture2.complete(getSlotManager().registerTaskManager(createTaskExecutorConnection, new SlotReport(FineGrainedSlotManagerTestBase.createAllocatedSlotStatus(jobID, new AllocationID(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((SlotManager.RegistrationResult) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2)).isEqualTo(SlotManager.RegistrationResult.SUCCESS);
                    runInMainThreadAndWait(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID, 2));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete(completableFuture);
                    runInMainThreadAndWait(() -> {
                        getSlotManager().processResourceRequirements(ResourceRequirements.empty(jobID, "foobar"));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete(completableFuture);
                    runInMainThreadAndWait(() -> {
                        getSlotManager().clearResourceRequirements(jobID);
                    });
                    FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isEqualTo(jobID);
                });
            }
        };
    }

    @Test
    void testClearResourceRequirementsWithPendingTaskManager() throws Exception {
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.16
            {
                JobID jobID = new JobID();
                CompletableFuture completableFuture = new CompletableFuture();
                this.resourceAllocatorBuilder.setDeclareResourceNeededConsumer(collection -> {
                    completableFuture.complete(null);
                });
                PendingTaskManager pendingTaskManager = new PendingTaskManager(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
                PendingTaskManager pendingTaskManager2 = new PendingTaskManager(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((map, taskManagerResourceInfoProvider) -> {
                    return ResourceAllocationResult.builder().addPendingTaskManagerAllocate(pendingTaskManager).addPendingTaskManagerAllocate(pendingTaskManager2).addAllocationOnPendingResource(jobID, pendingTaskManager.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).addAllocationOnPendingResource(jobID, pendingTaskManager2.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build();
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID, 2));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture);
                    runInMainThreadAndWait(() -> {
                        getSlotManager().processResourceRequirements(ResourceRequirements.empty(jobID, "foobar"));
                    });
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    runInMainThread(() -> {
                        try {
                            getSlotManager().clearResourceRequirements(jobID);
                        } catch (Exception e) {
                            completableFuture2.completeExceptionally(e);
                        }
                        completableFuture2.complete(null);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2);
                });
            }
        };
    }
}
