/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeTaskManagerSlot;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.LeastUtilizationSlotMatchingStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotMatchingStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActionsBuilder;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class DeclarativeSlotManagerTest
extends TestLogger {
    private static final FlinkException TEST_EXCEPTION = new FlinkException("Test exception");
    private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new WorkerResourceSpec.Builder().setCpuCores(100.0).setTaskHeapMemoryMB(10000).setTaskOffHeapMemoryMB(10000).setNetworkMemoryMB(10000).setManagedMemoryMB(10000).build();

    @Test
    public void testCloseAfterSuspendDoesNotThrowException() throws Exception {
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStartWithDirectExec();){
            slotManager.suspend();
        }
    }

    @Test
    public void testTaskManagerRegistration() throws Exception {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Assert.assertThat((String)"The number registered slots does not equal the expected number.", (Object)slotManager.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)2));
            Assert.assertNotNull((Object)slotTracker.getSlot(slotId1));
            Assert.assertNotNull((Object)slotTracker.getSlot(slotId2));
        }
    }

    @Test
    public void testTaskManagerUnregistration() throws Exception {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> new CompletableFuture()).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = taskManagerConnection.getResourceID();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Assert.assertEquals((String)"The number registered slots does not equal the expected number.", (long)2L, (long)slotManager.getNumberRegisteredSlots());
            slotManager.processResourceRequirements(resourceRequirements);
            slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID(), (Exception)TEST_EXCEPTION);
            Assert.assertEquals((long)0L, (long)slotManager.getNumberRegisteredSlots());
        }
    }

    @Test
    public void testRequirementDeclarationWithoutFreeSlotsTriggersWorkerAllocation() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        CompletableFuture allocateResourceFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(allocateResourceFuture::complete).build();
        try (DeclarativeSlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.processResourceRequirements(resourceRequirements);
            allocateResourceFuture.get();
        }
    }

    @Test
    public void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(value -> false).build();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStartWithDirectExec(ResourceManagerId.generate(), resourceManagerActions);){
            slotManager.processResourceRequirements(resourceRequirements);
            JobID jobId = resourceRequirements.getJobId();
            Assert.assertThat((Object)DeclarativeSlotManagerTest.getTotalResourceCount((Collection)resourceTracker.getMissingResources().get(jobId)), (Matcher)Matchers.is((Object)1));
        }
    }

    @Test
    public void testRequirementDeclarationWithFreeSlot() throws Exception {
        this.testRequirementDeclaration(RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION);
    }

    @Test
    public void testRequirementDeclarationWithPendingSlot() throws Exception {
        this.testRequirementDeclaration(RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION);
    }

    private void testRequirementDeclaration(RequirementDeclarationScenario scenario) throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        ResourceProfile resourceProfile = ResourceProfile.fromResources((double)42.0, (int)1337);
        CompletableFuture requestFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            requestFuture.complete(Tuple6.of((Object)tuple6.f0, (Object)tuple6.f1, (Object)tuple6.f2, (Object)tuple6.f3, (Object)tuple6.f4, (Object)tuple6.f5));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec(resourceManagerId, new TestingResourceActionsBuilder().build());){
            if (scenario == RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION) {
                slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            }
            ResourceRequirements requirements = ResourceRequirements.create((JobID)jobId, (String)"localhost", Collections.singleton(ResourceRequirement.create((ResourceProfile)resourceProfile, (int)1)));
            slotManager.processResourceRequirements(requirements);
            if (scenario == RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION) {
                slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            }
            Assert.assertThat(requestFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)Tuple6.of((Object)slotId, (Object)jobId, (Object)((Tuple6)requestFuture.get()).f2, (Object)resourceProfile, (Object)"localhost", (Object)resourceManagerId))));
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)jobId, (Object)slot.getJobId());
        }
    }

    @Test
    public void testFreeSlot() throws Exception {
        TaskExecutorConnection taskExecutorConnection = this.createTaskExecutorConnection();
        ResourceID resourceID = taskExecutorConnection.getResourceID();
        SlotID slotId = new SlotID(resourceID, 0);
        SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            Assert.assertSame((Object)SlotState.ALLOCATED, (Object)slot.getState());
            slotManager.freeSlot(slotId, new AllocationID());
            Assert.assertSame((Object)SlotState.FREE, (Object)slot.getState());
            Assert.assertEquals((long)1L, (long)slotManager.getNumberFreeSlots());
        }
    }

    @Test
    public void testDuplicateResourceRequirementDeclarationAfterSuccessfulAllocation() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        AtomicInteger allocateResourceCalls = new AtomicInteger(0);
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(ignored -> allocateResourceCalls.incrementAndGet()).build();
        ResourceRequirements requirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        ResourceID resourceID = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId = new SlotID(resourceID, 0);
        SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(requirements);
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            Assert.assertThat((Object)slot.getState(), (Matcher)Matchers.is((Object)SlotState.ALLOCATED));
            slotManager.processResourceRequirements(requirements);
        }
        Assert.assertThat((Object)allocateResourceCalls.get(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testSlotCanBeAllocatedForDifferentJobAfterFree() throws Exception {
        this.testSlotCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime.BEFORE_FREE);
        this.testSlotCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime.AFTER_FREE);
    }

    private void testSlotCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime secondRequirementDeclarationTime) throws Exception {
        AllocationID allocationId = new AllocationID();
        ResourceRequirements resourceRequirements1 = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        ResourceRequirements resourceRequirements2 = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection();
        ResourceID resourceID = taskManagerConnection.getResourceID();
        SlotID slotId = new SlotID(resourceID, 0);
        SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(resourceRequirements1);
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected job id.", (Object)resourceRequirements1.getJobId(), (Object)slot.getJobId());
            if (secondRequirementDeclarationTime == SecondRequirementDeclarationTime.BEFORE_FREE) {
                slotManager.processResourceRequirements(resourceRequirements2);
            }
            slotManager.processResourceRequirements(ResourceRequirements.create((JobID)resourceRequirements1.getJobId(), (String)resourceRequirements1.getTargetAddress(), Collections.emptyList()));
            slotManager.freeSlot(slotId, allocationId);
            if (secondRequirementDeclarationTime == SecondRequirementDeclarationTime.AFTER_FREE) {
                slotManager.processResourceRequirements(resourceRequirements2);
            }
            Assert.assertEquals((String)"The slot has not been allocated to the expected job id.", (Object)resourceRequirements2.getJobId(), (Object)slot.getJobId());
        }
    }

    @Test
    public void testReceivingUnknownSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
        InstanceID unknownInstanceID = new InstanceID();
        SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
        SlotReport unknownSlotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(unknownSlotId));
        try (DeclarativeSlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)0));
            Assert.assertFalse((boolean)slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport));
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)0));
        }
    }

    @Test
    public void testUpdateSlotReport() throws Exception {
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection();
        ResourceID resourceId = taskManagerConnection.getResourceID();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1);
        SlotStatus slotStatus2 = DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2);
        SlotStatus newSlotStatus2 = DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId2);
        JobID jobId = newSlotStatus2.getJobID();
        SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            Assert.assertEquals((long)0L, (long)slotManager.getNumberRegisteredSlots());
            slotManager.registerTaskManager(taskManagerConnection, slotReport1, ResourceProfile.ANY, ResourceProfile.ANY);
            DeclarativeTaskManagerSlot slot1 = slotTracker.getSlot(slotId1);
            DeclarativeTaskManagerSlot slot2 = slotTracker.getSlot(slotId2);
            Assert.assertEquals((long)2L, (long)slotManager.getNumberRegisteredSlots());
            Assert.assertSame((Object)SlotState.FREE, (Object)slot1.getState());
            Assert.assertSame((Object)SlotState.FREE, (Object)slot2.getState());
            Assert.assertTrue((boolean)slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2));
            Assert.assertEquals((long)2L, (long)slotManager.getNumberRegisteredSlots());
            Assert.assertNotNull((Object)slotTracker.getSlot(slotId1));
            Assert.assertNotNull((Object)slotTracker.getSlot(slotId2));
            Assert.assertSame((Object)SlotState.FREE, (Object)slot1.getState());
            Assert.assertEquals((Object)jobId, (Object)slotTracker.getSlot(slotId2).getJobId());
        }
    }

    @Test
    public void testSlotAllocationTimeout() throws Exception {
        CompletableFuture secondSlotRequestFuture = new CompletableFuture();
        ArrayBlockingQueue<Supplier<CompletableFuture>> responseQueue = new ArrayBlockingQueue<Supplier<CompletableFuture>>(2);
        responseQueue.add(() -> FutureUtils.completedExceptionally((Throwable)new TimeoutException("timeout")));
        responseQueue.add(() -> {
            secondSlotRequestFuture.complete(null);
            return new CompletableFuture();
        });
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection(new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> (CompletableFuture)((Supplier)responseQueue.remove()).get()).createTestingTaskExecutorGateway());
        SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskManagerConnection.getResourceID(), 2);
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().build();){
            slotManager.start(ResourceManagerId.generate(), (Executor)mainThreadExecutor, (ResourceActions)new TestingResourceActionsBuilder().build());
            ((CompletableFuture)CompletableFuture.runAsync(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY), mainThreadExecutor).thenRun(() -> slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot()))).get(5L, TimeUnit.SECONDS);
            secondSlotRequestFuture.get();
        }
    }

    @Test
    public void testTaskExecutorSlotAllocationTimeoutHandling() throws Exception {
        JobID jobId = new JobID();
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(jobId);
        CompletableFuture slotRequestFuture1 = new CompletableFuture();
        CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<Acknowledge>();
        Iterator<CompletableFuture> slotRequestFutureIterator = Arrays.asList(slotRequestFuture1, slotRequestFuture2).iterator();
        ArrayBlockingQueue slotIds = new ArrayBlockingQueue(2);
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(FunctionUtils.uncheckedFunction(requestSlotParameters -> {
            slotIds.put(requestSlotParameters.f0);
            return (CompletableFuture)slotRequestFutureIterator.next();
        })).createTestingTaskExecutorGateway();
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(resourceRequirements);
            SlotID firstSlotId = (SlotID)slotIds.take();
            Assert.assertThat(slotIds, (Matcher)Matchers.is((Matcher)Matchers.empty()));
            DeclarativeTaskManagerSlot failedSlot = slotTracker.getSlot(firstSlotId);
            slotRequestFuture1.completeExceptionally((Throwable)new SlotAllocationException("Test exception."));
            Assert.assertThat((Object)DeclarativeSlotManagerTest.getTotalResourceCount(resourceTracker.getAcquiredResources(jobId)), (Matcher)Matchers.is((Object)1));
            slotRequestFuture2.complete(Acknowledge.get());
            SlotID secondSlotId = (SlotID)slotIds.take();
            Assert.assertThat(slotIds, (Matcher)Matchers.is((Matcher)Matchers.empty()));
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(secondSlotId);
            Assert.assertThat((Object)slot.getState(), (Matcher)Matchers.is((Object)SlotState.ALLOCATED));
            Assert.assertEquals((Object)jobId, (Object)slot.getJobId());
            if (!failedSlot.getSlotId().equals((Object)slot.getSlotId())) {
                Assert.assertThat((Object)failedSlot.getState(), (Matcher)Matchers.is((Object)SlotState.FREE));
            }
        }
    }

    @Test
    public void testSlotReportWithConflictingJobIdDuringSlotAllocation() throws Exception {
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        ArrayBlockingQueue requestedSlotIds = new ArrayBlockingQueue(2);
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(FunctionUtils.uncheckedFunction(requestSlotParameters -> {
            requestedSlotIds.put(requestSlotParameters.f0);
            return new CompletableFuture();
        })).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = this.createTaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = taskExecutorConnection.getResourceID();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        ManuallyTriggeredScheduledExecutor mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setScheduledExecutor((ScheduledExecutor)mainThreadExecutor).build();){
            slotManager.start(ResourceManagerId.generate(), (Executor)mainThreadExecutor, (ResourceActions)new TestingResourceActionsBuilder().build());
            slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(resourceRequirements);
            SlotID firstRequestedSlotId = (SlotID)requestedSlotIds.take();
            SlotID freeSlotId = firstRequestedSlotId.equals((Object)slotId1) ? slotId2 : slotId1;
            SlotReport newSlotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createAllocatedSlotStatus(firstRequestedSlotId), DeclarativeSlotManagerTest.createFreeSlotStatus(freeSlotId)));
            slotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), newSlotReport);
            SlotID secondRequestedSlotId = (SlotID)requestedSlotIds.take();
            Assert.assertEquals((Object)freeSlotId, (Object)secondRequestedSlotId);
        }
    }

    @Test
    public void testReportAllocatedSlot() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, (TaskExecutorGateway)taskExecutorGateway);
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            SlotID slotId = new SlotID(taskManagerId, 0);
            SlotReport initialSlotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId));
            slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)1)));
            SlotStatus slotStatus = DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId);
            SlotReport slotReport = new SlotReport(slotStatus);
            slotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport);
            JobID jobId = new JobID();
            ResourceRequirements requirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(jobId);
            slotManager.processResourceRequirements(requirements);
            Assert.assertThat((Object)slotTracker.getSlot(slotId).getJobId(), (Matcher)Matchers.is((Object)slotStatus.getJobID()));
            Assert.assertThat((Object)DeclarativeSlotManagerTest.getTotalResourceCount((Collection)resourceTracker.getMissingResources().get(jobId)), (Matcher)Matchers.is((Object)1));
        }
    }

    @Test
    public void testSlotRequestFailure() throws Exception {
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            ResourceRequirements requirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
            slotManager.processResourceRequirements(requirements);
            ArrayBlockingQueue requestSlotQueue = new ArrayBlockingQueue(1);
            ArrayBlockingQueue responseQueue = new ArrayBlockingQueue(2);
            CompletableFuture firstManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(firstManualSlotRequestResponse);
            CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<Acknowledge>();
            responseQueue.offer(secondManualSlotRequestResponse);
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
                requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple6);
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException ignored) {
                    return FutureUtils.completedExceptionally((Throwable)new FlinkException("Response queue was interrupted."));
                }
            }).createTestingTaskExecutorGateway();
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)testingTaskExecutorGateway);
            SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(new SlotID(taskExecutorResourceId, 0)));
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Tuple6 firstRequest = (Tuple6)requestSlotQueue.take();
            firstManualSlotRequestResponse.completeExceptionally((Throwable)new SlotAllocationException("Test exception"));
            Tuple6 secondRequest = (Tuple6)requestSlotQueue.take();
            Assert.assertThat((Object)secondRequest.f1, (Matcher)Matchers.equalTo((Object)firstRequest.f1));
            Assert.assertThat((Object)secondRequest.f0, (Matcher)Matchers.equalTo((Object)firstRequest.f0));
            secondManualSlotRequestResponse.complete(Acknowledge.get());
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot((SlotID)secondRequest.f0);
            Assert.assertThat((Object)slot.getState(), (Matcher)Matchers.equalTo((Object)SlotState.ALLOCATED));
            Assert.assertThat((Object)slot.getJobId(), (Matcher)Matchers.equalTo((Object)secondRequest.f1));
        }
    }

    @Test
    public void testSlotRequestRemovedIfTMReportsAllocation() throws Exception {
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            JobID jobID = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(jobID));
            ArrayBlockingQueue requestSlotQueue = new ArrayBlockingQueue(1);
            ArrayBlockingQueue responseQueue = new ArrayBlockingQueue(2);
            CompletableFuture firstManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(firstManualSlotRequestResponse);
            CompletableFuture secondManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(secondManualSlotRequestResponse);
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
                requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple6);
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException ignored) {
                    return FutureUtils.completedExceptionally((Throwable)new FlinkException("Response queue was interrupted."));
                }
            }).createTestingTaskExecutorGateway();
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)testingTaskExecutorGateway);
            SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(new SlotID(taskExecutorResourceId, 0)));
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Tuple6 firstRequest = (Tuple6)requestSlotQueue.take();
            firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
            Tuple6 secondRequest = (Tuple6)requestSlotQueue.take();
            secondManualSlotRequestResponse.completeExceptionally((Throwable)new SlotOccupiedException("Test exception", new AllocationID(), jobID));
            Assert.assertThat((Object)firstRequest.f1, (Matcher)Matchers.equalTo((Object)jobID));
            Assert.assertThat((Object)secondRequest.f1, (Matcher)Matchers.equalTo((Object)jobID));
            Assert.assertThat((Object)secondRequest.f0, (Matcher)Matchers.equalTo((Object)firstRequest.f0));
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot((SlotID)secondRequest.f0);
            Assert.assertThat((Object)slot.getState(), (Matcher)Matchers.equalTo((Object)SlotState.ALLOCATED));
            Assert.assertThat((Object)slot.getJobId(), (Matcher)Matchers.equalTo((Object)firstRequest.f1));
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)1));
            Assert.assertThat((Object)DeclarativeSlotManagerTest.getTotalResourceCount(resourceTracker.getAcquiredResources(jobID)), (Matcher)Matchers.is((Object)1));
        }
    }

    @Test
    public void testTaskExecutorFailedHandling() throws Exception {
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStartWithDirectExec();){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            TaskExecutorConnection taskExecutionConnection1 = this.createTaskExecutorConnection();
            SlotReport slotReport1 = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection1.getResourceID(), 2);
            slotManager.registerTaskManager(taskExecutionConnection1, slotReport1, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID(), (Exception)TEST_EXCEPTION);
            Assert.assertThat((Object)DeclarativeSlotManagerTest.getTotalResourceCount((Collection)resourceTracker.getMissingResources().get(jobId)), (Matcher)Matchers.is((Object)2));
        }
    }

    @Test
    public void testRequestNewResources() throws Exception {
        int numberSlots = 2;
        AtomicInteger resourceRequests = new AtomicInteger(0);
        TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(ignored -> {
            resourceRequests.incrementAndGet();
            return true;
        }).build();
        try (DeclarativeSlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), testingResourceActions, 2);){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)Matchers.is((Object)1));
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)Matchers.is((Object)1));
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 3));
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)Matchers.is((Object)2));
        }
    }

    private TaskExecutorConnection createTaskExecutorConnection() {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        return this.createTaskExecutorConnection(taskExecutorGateway);
    }

    private TaskExecutorConnection createTaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
        return new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway);
    }

    @Test
    public void testSpreadOutSlotAllocationStrategy() throws Exception {
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotMatchingStrategy((SlotMatchingStrategy)LeastUtilizationSlotMatchingStrategy.INSTANCE).buildAndStartWithDirectExec();){
            ArrayList<CompletableFuture<JobID>> requestSlotFutures = new ArrayList<CompletableFuture<JobID>>();
            int numberTaskExecutors = 5;
            for (int i = 0; i < 5; ++i) {
                CompletableFuture<JobID> requestSlotFuture = new CompletableFuture<JobID>();
                requestSlotFutures.add(requestSlotFuture);
                this.registerTaskExecutorWithTwoSlots(slotManager, requestSlotFuture);
            }
            JobID jobId = new JobID();
            ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirements(jobId, 5);
            slotManager.processResourceRequirements(resourceRequirements);
            HashSet jobIds = new HashSet((Collection)FutureUtils.combineAll(requestSlotFutures).get(10L, TimeUnit.SECONDS));
            Assert.assertThat(jobIds, (Matcher)Matchers.hasSize((int)1));
            Assert.assertThat(jobIds, (Matcher)Matchers.containsInAnyOrder((Object[])new JobID[]{jobId}));
        }
    }

    private void registerTaskExecutorWithTwoSlots(DeclarativeSlotManager slotManager, CompletableFuture<JobID> firstRequestSlotFuture) {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
            firstRequestSlotFuture.complete((JobID)slotIDJobIDAllocationIDStringResourceManagerIdTuple6.f1);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection firstTaskExecutorConnection = this.createTaskExecutorConnection(taskExecutorGateway);
        SlotReport firstSlotReport = DeclarativeSlotManagerTest.createSlotReport(firstTaskExecutorConnection.getResourceID(), 2);
        slotManager.registerTaskManager(firstTaskExecutorConnection, firstSlotReport, ResourceProfile.ANY, ResourceProfile.ANY);
    }

    @Test
    public void testNotificationAboutNotEnoughResources() throws Exception {
        DeclarativeSlotManagerTest.testNotificationAboutNotEnoughResources(false);
    }

    @Test
    public void testGracePeriodForNotificationAboutNotEnoughResources() throws Exception {
        DeclarativeSlotManagerTest.testNotificationAboutNotEnoughResources(true);
    }

    private static void testNotificationAboutNotEnoughResources(boolean withNotificationGracePeriod) throws Exception {
        JobID jobId = new JobID();
        int numRequiredSlots = 3;
        boolean numExistingSlots = true;
        ArrayList notEnoughResourceNotifications = new ArrayList();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(ignored -> false).setNotEnoughResourcesConsumer((jobId1, acquiredResources) -> notEnoughResourceNotifications.add(Tuple2.of((Object)jobId1, (Object)acquiredResources))).build();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStart(ResourceManagerId.generate(), (Executor)new ManuallyTriggeredScheduledExecutor(), resourceManagerActions);){
            if (withNotificationGracePeriod) {
                slotManager.setFailUnfulfillableRequest(false);
            }
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutorResourceId, 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirements(jobId, 3);
            slotManager.processResourceRequirements(resourceRequirements);
            if (withNotificationGracePeriod) {
                Assert.assertThat(notEnoughResourceNotifications, (Matcher)Matchers.empty());
                slotManager.setFailUnfulfillableRequest(true);
            }
            Assert.assertThat(notEnoughResourceNotifications, (Matcher)Matchers.hasSize((int)1));
            Tuple2 notification = (Tuple2)notEnoughResourceNotifications.get(0);
            Assert.assertThat((Object)notification.f0, (Matcher)Matchers.is((Object)jobId));
            Assert.assertThat((Object)notification.f1, (Matcher)CoreMatchers.hasItem((Object)ResourceRequirement.create((ResourceProfile)ResourceProfile.ANY, (int)1)));
            slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), slotReport);
            Assert.assertThat(notEnoughResourceNotifications, (Matcher)Matchers.hasSize((int)1));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocationUpdatesIgnoredIfTaskExecutorUnregistered() throws Exception {
        ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> CompletableFuture.completedFuture(Acknowledge.get())).createTestingTaskExecutorGateway();
        SystemExitTrackingSecurityManager trackingSecurityManager = new SystemExitTrackingSecurityManager();
        System.setSecurityManager(trackingSecurityManager);
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStart(ResourceManagerId.generate(), (Executor)executor, new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.unregisterTaskManager(taskExecutionConnection.getInstanceID(), (Exception)TEST_EXCEPTION);
            executor.triggerAll();
            Assert.assertThat((Object)trackingSecurityManager.getSystemExitFuture().isDone(), (Matcher)Matchers.is((Object)false));
        }
        finally {
            System.setSecurityManager(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport() throws Exception {
        ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> CompletableFuture.completedFuture(Acknowledge.get())).createTestingTaskExecutorGateway();
        SystemExitTrackingSecurityManager trackingSecurityManager = new SystemExitTrackingSecurityManager();
        System.setSecurityManager(trackingSecurityManager);
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStart(ResourceManagerId.generate(), (Executor)executor, new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), DeclarativeSlotManagerTest.createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), jobId, 1));
            executor.triggerAll();
            Assert.assertThat((Object)trackingSecurityManager.getSystemExitFuture().isDone(), (Matcher)Matchers.is((Object)false));
        }
        finally {
            System.setSecurityManager(null);
        }
    }

    @Test
    public void testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throws Exception {
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        CompletableFuture firstSlotAllocationIdFuture = new CompletableFuture();
        CompletableFuture<Acknowledge> firstSlotRequestAcknowledgeFuture = new CompletableFuture<Acknowledge>();
        Iterator<CompletableFuture> slotRequestAcknowledgeFutures = Arrays.asList(firstSlotRequestAcknowledgeFuture, new CompletableFuture()).iterator();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(requestSlotParameters -> {
            firstSlotAllocationIdFuture.complete(requestSlotParameters.f2);
            return (CompletableFuture)slotRequestAcknowledgeFutures.next();
        }).createTestingTaskExecutorGateway();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStart(ResourceManagerId.generate(), (Executor)ComponentMainThreadExecutorServiceAdapter.forMainThread(), new TestingResourceActionsBuilder().build());){
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1);
            SlotID slotId = ((SlotStatus)Iterators.getOnlyElement((Iterator)slotReport.iterator())).getSlotID();
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1));
            JobID firstJobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(firstJobId, 1));
            slotManager.processResourceRequirements(ResourceRequirements.empty((JobID)firstJobId, (String)"foobar"));
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(new JobID(), 1));
            slotManager.freeSlot(slotId, (AllocationID)firstSlotAllocationIdFuture.get());
            firstSlotRequestAcknowledgeFuture.complete(Acknowledge.get());
            Assert.assertThat((Object)slotTracker.getSlot(slotId).getJobId(), (Matcher)Matchers.is((Matcher)CoreMatchers.not((Object)firstJobId)));
        }
    }

    @Test
    public void testReclaimInactiveSlotsOnClearRequirements() throws Exception {
        CompletableFuture freeInactiveSlotsJobIdFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeInactiveSlotsConsumer(freeInactiveSlotsJobIdFuture::complete).createTestingTaskExecutorGateway();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStart(ResourceManagerId.generate(), (Executor)ComponentMainThreadExecutorServiceAdapter.forMainThread(), new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), jobId, 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            Assert.assertThat((Object)freeInactiveSlotsJobIdFuture.isDone(), (Matcher)Matchers.is((Object)false));
            slotManager.processResourceRequirements(ResourceRequirements.empty((JobID)jobId, (String)"foobar"));
            Assert.assertThat((Object)freeInactiveSlotsJobIdFuture.isDone(), (Matcher)Matchers.is((Object)false));
            slotManager.clearResourceRequirements(jobId);
            Assert.assertThat(freeInactiveSlotsJobIdFuture.get(), (Matcher)Matchers.is((Object)jobId));
        }
    }

    @Test
    public void testClearRequirementsClearsResourceTracker() throws Exception {
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        CompletableFuture freeInactiveSlotsJobIdFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeInactiveSlotsConsumer(freeInactiveSlotsJobIdFuture::complete).createTestingTaskExecutorGateway();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStart(ResourceManagerId.generate(), (Executor)ComponentMainThreadExecutorServiceAdapter.forMainThread(), new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), jobId, 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            slotManager.clearResourceRequirements(jobId);
            Assert.assertThat(resourceTracker.getMissingResources().keySet(), (Matcher)Matchers.empty());
        }
    }

    @Test
    public void testMetricsUnregisteredWhenSuspending() throws Exception {
        this.testAccessMetricValueDuringItsUnregister((ThrowingConsumer<SlotManager, Exception>)((ThrowingConsumer)SlotManager::suspend));
    }

    @Test
    public void testMetricsUnregisteredWhenClosing() throws Exception {
        this.testAccessMetricValueDuringItsUnregister((ThrowingConsumer<SlotManager, Exception>)((ThrowingConsumer)AutoCloseable::close));
    }

    private void testAccessMetricValueDuringItsUnregister(ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception {
        AtomicInteger registeredMetrics = new AtomicInteger();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((a, b, c) -> registeredMetrics.incrementAndGet()).setUnregisterConsumer((a, b, c) -> registeredMetrics.decrementAndGet()).build();
        DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotManagerMetricGroup(SlotManagerMetricGroup.create((MetricRegistry)metricRegistry, (String)"localhost")).buildAndStartWithDirectExec();
        Assert.assertThat((Object)registeredMetrics.get(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
        closeFn.accept((Object)slotManager);
        Assert.assertThat((Object)registeredMetrics.get(), (Matcher)Matchers.is((Object)0));
    }

    private static SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
        HashSet<SlotStatus> slotStatusSet = new HashSet<SlotStatus>(numberSlots);
        for (int i = 0; i < numberSlots; ++i) {
            slotStatusSet.add(DeclarativeSlotManagerTest.createFreeSlotStatus(new SlotID(taskExecutorResourceId, i)));
        }
        return new SlotReport(slotStatusSet);
    }

    private static SlotReport createSlotReportWithAllocatedSlots(ResourceID taskExecutorResourceId, JobID jobId, int numberSlots) {
        HashSet<SlotStatus> slotStatusSet = new HashSet<SlotStatus>(numberSlots);
        for (int i = 0; i < numberSlots; ++i) {
            slotStatusSet.add(DeclarativeSlotManagerTest.createAllocatedSlotStatus(new SlotID(taskExecutorResourceId, i), jobId));
        }
        return new SlotReport(slotStatusSet);
    }

    private static SlotStatus createFreeSlotStatus(SlotID slotId) {
        return new SlotStatus(slotId, ResourceProfile.ANY);
    }

    private static SlotStatus createAllocatedSlotStatus(SlotID slotId) {
        return DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId, JobID.generate());
    }

    private static SlotStatus createAllocatedSlotStatus(SlotID slotId, JobID jobId) {
        return new SlotStatus(slotId, ResourceProfile.ANY, jobId, new AllocationID());
    }

    private DeclarativeSlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
        return this.createSlotManager(resourceManagerId, resourceManagerActions, 1);
    }

    private DeclarativeSlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions, int numSlotsPerWorker) {
        return DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setNumSlotsPerWorker(numSlotsPerWorker).setRedundantTaskManagerNum(0).buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
    }

    private static DeclarativeSlotManagerBuilder createDeclarativeSlotManagerBuilder() {
        return DeclarativeSlotManagerBuilder.newBuilder().setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC);
    }

    private static ResourceRequirements createResourceRequirementsForSingleSlot() {
        return DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(new JobID());
    }

    private static ResourceRequirements createResourceRequirementsForSingleSlot(JobID jobId) {
        return DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1);
    }

    private static ResourceRequirements createResourceRequirements(JobID jobId, int numRequiredSlots) {
        return ResourceRequirements.create((JobID)jobId, (String)"foobar", Collections.singleton(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)numRequiredSlots)));
    }

    private static int getTotalResourceCount(Collection<ResourceRequirement> resources) {
        if (resources == null) {
            return 0;
        }
        return resources.stream().map(ResourceRequirement::getNumberOfRequiredSlots).reduce(0, Integer::sum);
    }

    private static enum SecondRequirementDeclarationTime {
        BEFORE_FREE,
        AFTER_FREE;

    }

    private static enum RequirementDeclarationScenario {
        TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION,
        TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION;

    }
}

