package org.apache.flink.runtime.jobmaster.slotpool;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.resources.GPUResource;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.class */
public class SlotSharingManagerTest extends TestLogger {
    private static final SlotSharingGroupId SLOT_SHARING_GROUP_ID = new SlotSharingGroupId();
    private static final DummySlotOwner SLOT_OWNER = new DummySlotOwner();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest$SlotSharingResourceTestContext.class */
    public class SlotSharingResourceTestContext {
        final SlotSharingManager slotSharingManager;
        final SlotSharingManager.MultiTaskSlot coLocationTaskSlot;
        final List<SlotSharingManager.SingleTaskSlot> singleTaskSlotsInOrder;

        SlotSharingResourceTestContext(@Nonnull SlotSharingManager slotSharingManager, @Nonnull SlotSharingManager.MultiTaskSlot multiTaskSlot, @Nonnull List<SlotSharingManager.SingleTaskSlot> list) {
            this.slotSharingManager = slotSharingManager;
            this.coLocationTaskSlot = multiTaskSlot;
            this.singleTaskSlotsInOrder = list;
        }
    }

    @Test
    public void testRootSlotCreation() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotRequestId slotRequestId = new SlotRequestId();
        Assert.assertEquals(slotRequestId, createTestingSlotSharingManager.createRootSlot(slotRequestId, new CompletableFuture(), new SlotRequestId()).getSlotRequestId());
        Assert.assertNotNull(createTestingSlotSharingManager.getTaskSlot(slotRequestId));
    }

    @Test
    public void testRootSlotRelease() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingAllocatedSlotActions testingAllocatedSlotActions = new TestingAllocatedSlotActions();
        testingAllocatedSlotActions.setReleaseSlotConsumer(tuple2 -> {
            completableFuture.complete(tuple2.f0);
        });
        SlotSharingManager slotSharingManager = new SlotSharingManager(SLOT_SHARING_GROUP_ID, testingAllocatedSlotActions, SLOT_OWNER);
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotRequestId slotRequestId2 = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot createRootSlot = slotSharingManager.createRootSlot(slotRequestId, new CompletableFuture(), slotRequestId2);
        Assert.assertTrue(slotSharingManager.contains(slotRequestId));
        createRootSlot.release(new FlinkException("Test exception"));
        Assert.assertEquals(slotRequestId2, completableFuture.get());
        Assert.assertFalse(slotSharingManager.contains(slotRequestId));
    }

    @Test
    public void testNestedSlotCreation() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), new CompletableFuture(), new SlotRequestId());
        AbstractID abstractID = new AbstractID();
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = createRootSlot.allocateSingleTaskSlot(slotRequestId, ResourceProfile.UNKNOWN, abstractID, Locality.LOCAL);
        AbstractID abstractID2 = new AbstractID();
        SlotRequestId slotRequestId2 = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot = createRootSlot.allocateMultiTaskSlot(slotRequestId2, abstractID2);
        Assert.assertTrue(Objects.equals(slotRequestId, allocateSingleTaskSlot.getSlotRequestId()));
        Assert.assertTrue(Objects.equals(slotRequestId2, allocateMultiTaskSlot.getSlotRequestId()));
        Assert.assertTrue(createRootSlot.contains(abstractID));
        Assert.assertTrue(createRootSlot.contains(abstractID2));
        Assert.assertTrue(createTestingSlotSharingManager.contains(slotRequestId));
        Assert.assertTrue(createTestingSlotSharingManager.contains(slotRequestId2));
    }

    @Test
    public void testNestedSlotRelease() throws Exception {
        TestingAllocatedSlotActions testingAllocatedSlotActions = new TestingAllocatedSlotActions();
        CompletableFuture completableFuture = new CompletableFuture();
        testingAllocatedSlotActions.setReleaseSlotConsumer(tuple2 -> {
            completableFuture.complete(tuple2.f0);
        });
        SlotSharingManager slotSharingManager = new SlotSharingManager(SLOT_SHARING_GROUP_ID, testingAllocatedSlotActions, SLOT_OWNER);
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotRequestId slotRequestId2 = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot createRootSlot = slotSharingManager.createRootSlot(slotRequestId, new CompletableFuture(), slotRequestId2);
        SlotRequestId slotRequestId3 = new SlotRequestId();
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = createRootSlot.allocateSingleTaskSlot(slotRequestId3, ResourceProfile.UNKNOWN, new AbstractID(), Locality.LOCAL);
        SlotRequestId slotRequestId4 = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot = createRootSlot.allocateMultiTaskSlot(slotRequestId4, new AbstractID());
        CompletableFuture logicalSlotFuture = allocateSingleTaskSlot.getLogicalSlotFuture();
        Assert.assertTrue(slotSharingManager.contains(slotRequestId));
        Assert.assertTrue(slotSharingManager.contains(slotRequestId3));
        Assert.assertFalse(logicalSlotFuture.isDone());
        FlinkException flinkException = new FlinkException("Test exception");
        allocateSingleTaskSlot.release(flinkException);
        Assert.assertTrue(logicalSlotFuture.isCompletedExceptionally());
        Assert.assertFalse(slotSharingManager.contains(slotRequestId3));
        Assert.assertTrue(slotSharingManager.contains(slotRequestId));
        allocateMultiTaskSlot.release(flinkException);
        Assert.assertEquals(slotRequestId2, completableFuture.get());
        Assert.assertFalse(slotSharingManager.contains(slotRequestId));
        Assert.assertFalse(slotSharingManager.contains(slotRequestId4));
        Assert.assertTrue(slotSharingManager.isEmpty());
    }

    @Test
    public void testInnerSlotRelease() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), new CompletableFuture(), new SlotRequestId()).allocateMultiTaskSlot(new SlotRequestId(), new AbstractID());
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = allocateMultiTaskSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, new AbstractID(), Locality.LOCAL);
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot2 = allocateMultiTaskSlot.allocateMultiTaskSlot(new SlotRequestId(), new AbstractID());
        Assert.assertTrue(createTestingSlotSharingManager.contains(allocateMultiTaskSlot2.getSlotRequestId()));
        Assert.assertTrue(createTestingSlotSharingManager.contains(allocateSingleTaskSlot.getSlotRequestId()));
        Assert.assertTrue(createTestingSlotSharingManager.contains(allocateMultiTaskSlot.getSlotRequestId()));
        allocateMultiTaskSlot.release(new FlinkException("Test exception"));
        Assert.assertFalse(createTestingSlotSharingManager.contains(allocateMultiTaskSlot2.getSlotRequestId()));
        Assert.assertFalse(createTestingSlotSharingManager.contains(allocateSingleTaskSlot.getSlotRequestId()));
        Assert.assertFalse(createTestingSlotSharingManager.contains(allocateMultiTaskSlot.getSlotRequestId()));
        Assert.assertTrue(allocateSingleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally());
    }

    @Test
    public void testSlotContextFutureCompletion() throws Exception {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SimpleSlotContext createSimpleSlotContext = createSimpleSlotContext();
        CompletableFuture completableFuture = new CompletableFuture();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), completableFuture, new SlotRequestId());
        Locality locality = Locality.LOCAL;
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, new AbstractID(), locality);
        Locality locality2 = Locality.HOST_LOCAL;
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot2 = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, new AbstractID(), locality2);
        CompletableFuture logicalSlotFuture = allocateSingleTaskSlot.getLogicalSlotFuture();
        CompletableFuture logicalSlotFuture2 = allocateSingleTaskSlot2.getLogicalSlotFuture();
        Assert.assertFalse(logicalSlotFuture.isDone());
        Assert.assertFalse(logicalSlotFuture2.isDone());
        completableFuture.complete(createSimpleSlotContext);
        Assert.assertTrue(logicalSlotFuture.isDone());
        Assert.assertTrue(logicalSlotFuture2.isDone());
        LogicalSlot logicalSlot = (LogicalSlot) logicalSlotFuture.get();
        LogicalSlot logicalSlot2 = (LogicalSlot) logicalSlotFuture2.get();
        Assert.assertEquals(logicalSlot.getAllocationId(), createSimpleSlotContext.getAllocationId());
        Assert.assertEquals(logicalSlot2.getAllocationId(), createSimpleSlotContext.getAllocationId());
        Assert.assertEquals(locality, logicalSlot.getLocality());
        Assert.assertEquals(locality2, logicalSlot2.getLocality());
        Locality locality3 = Locality.NON_LOCAL;
        CompletableFuture logicalSlotFuture3 = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, new AbstractID(), locality3).getLogicalSlotFuture();
        Assert.assertTrue(logicalSlotFuture3.isDone());
        LogicalSlot logicalSlot3 = (LogicalSlot) logicalSlotFuture3.get();
        Assert.assertEquals(locality3, logicalSlot3.getLocality());
        Assert.assertEquals(createSimpleSlotContext.getAllocationId(), logicalSlot3.getAllocationId());
    }

    private SimpleSlotContext createSimpleSlotContext() {
        return new SimpleSlotContext(new AllocationID(), new LocalTaskManagerLocation(), 0, new SimpleAckingTaskManagerGateway());
    }

    @Test
    public void testSlotContextFutureFailure() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        CompletableFuture completableFuture = new CompletableFuture();
        Assert.assertTrue(createTestingSlotSharingManager.isEmpty());
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), completableFuture, new SlotRequestId()).allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, new AbstractID(), Locality.LOCAL);
        completableFuture.completeExceptionally(new FlinkException("Test exception"));
        Assert.assertTrue(allocateSingleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally());
        Assert.assertTrue(createTestingSlotSharingManager.isEmpty());
        Assert.assertTrue(createTestingSlotSharingManager.getResolvedRootSlots().isEmpty());
        Assert.assertTrue(createTestingSlotSharingManager.getUnresolvedRootSlots().isEmpty());
    }

    @Test
    public void testRootSlotTransition() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        CompletableFuture completableFuture = new CompletableFuture();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), completableFuture, new SlotRequestId());
        Assert.assertTrue(createTestingSlotSharingManager.getUnresolvedRootSlots().contains(createRootSlot));
        Assert.assertFalse(createTestingSlotSharingManager.getResolvedRootSlots().contains(createRootSlot));
        completableFuture.complete(createSimpleSlotContext());
        Assert.assertFalse(createTestingSlotSharingManager.getUnresolvedRootSlots().contains(createRootSlot));
        Assert.assertTrue(createTestingSlotSharingManager.getResolvedRootSlots().contains(createRootSlot));
    }

    @Test
    public void testGetResolvedSlot() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotSharingManager.MultiTaskSlot createRootSlot = createRootSlot(new LocalTaskManagerLocation(), createTestingSlotSharingManager);
        AbstractID abstractID = new AbstractID();
        Collection listResolvedRootSlotInfo = createTestingSlotSharingManager.listResolvedRootSlotInfo(abstractID);
        Assert.assertEquals(1L, listResolvedRootSlotInfo.size());
        SlotSharingManager.MultiTaskSlot resolvedRootSlot = createTestingSlotSharingManager.getResolvedRootSlot(((SlotSelectionStrategy.SlotInfoAndResources) listResolvedRootSlotInfo.iterator().next()).getSlotInfo());
        SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality = (SlotSelectionStrategy.SlotInfoAndLocality) LocationPreferenceSlotSelectionStrategy.createDefault().selectBestSlotForProfile(listResolvedRootSlotInfo, SlotProfile.noRequirements()).get();
        Assert.assertNotNull(resolvedRootSlot);
        Assert.assertEquals(Locality.UNCONSTRAINED, slotInfoAndLocality.getLocality());
        Assert.assertEquals(createRootSlot.getSlotRequestId(), resolvedRootSlot.getSlotRequestId());
        resolvedRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, abstractID, Locality.UNCONSTRAINED);
        Assert.assertTrue(createTestingSlotSharingManager.listResolvedRootSlotInfo(abstractID).isEmpty());
    }

    @Test
    public void testGetResolvedSlotWithLocationPreferences() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotSharingManager.MultiTaskSlot createRootSlot = createRootSlot(new LocalTaskManagerLocation(), createTestingSlotSharingManager);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SlotSharingManager.MultiTaskSlot createRootSlot2 = createRootSlot(localTaskManagerLocation, createTestingSlotSharingManager);
        AbstractID abstractID = new AbstractID();
        SlotProfile preferredLocality = SlotProfile.preferredLocality(ResourceProfile.UNKNOWN, Collections.singleton(localTaskManagerLocation));
        Collection listResolvedRootSlotInfo = createTestingSlotSharingManager.listResolvedRootSlotInfo(abstractID);
        LocationPreferenceSlotSelectionStrategy createDefault = LocationPreferenceSlotSelectionStrategy.createDefault();
        SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality = (SlotSelectionStrategy.SlotInfoAndLocality) createDefault.selectBestSlotForProfile(listResolvedRootSlotInfo, preferredLocality).get();
        SlotSharingManager.MultiTaskSlot resolvedRootSlot = createTestingSlotSharingManager.getResolvedRootSlot(slotInfoAndLocality.getSlotInfo());
        Assert.assertNotNull(resolvedRootSlot);
        Assert.assertEquals(Locality.LOCAL, slotInfoAndLocality.getLocality());
        Assert.assertEquals(createRootSlot2.getSlotRequestId(), resolvedRootSlot.getSlotRequestId());
        resolvedRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, abstractID, slotInfoAndLocality.getLocality());
        SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality2 = (SlotSelectionStrategy.SlotInfoAndLocality) createDefault.selectBestSlotForProfile(createTestingSlotSharingManager.listResolvedRootSlotInfo(abstractID), preferredLocality).get();
        SlotSharingManager.MultiTaskSlot resolvedRootSlot2 = createTestingSlotSharingManager.getResolvedRootSlot(slotInfoAndLocality2.getSlotInfo());
        Assert.assertNotNull(resolvedRootSlot2);
        Assert.assertNotSame(Locality.LOCAL, slotInfoAndLocality2.getLocality());
        Assert.assertEquals(createRootSlot.getSlotRequestId(), resolvedRootSlot2.getSlotRequestId());
    }

    @Test
    public void testResolvedSlotInReleasingIsNotAvailable() throws Exception {
        final SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotSharingManager.MultiTaskSlot createRootSlot = createRootSlot(new LocalTaskManagerLocation(), createTestingSlotSharingManager);
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, new AbstractID(), Locality.UNCONSTRAINED);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AbstractID abstractID = new AbstractID();
        ((LogicalSlot) allocateSingleTaskSlot.getLogicalSlotFuture().get()).tryAssignPayload(new LogicalSlot.Payload() { // from class: org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManagerTest.1
            public void fail(Throwable th) {
                Assert.assertEquals(0L, createTestingSlotSharingManager.listResolvedRootSlotInfo(abstractID).size());
                atomicBoolean.set(true);
            }

            public CompletableFuture<?> getTerminalStateFuture() {
                return null;
            }
        });
        Assert.assertEquals(1L, createTestingSlotSharingManager.listResolvedRootSlotInfo(abstractID).size());
        createRootSlot.release(new Exception("test exception"));
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testGetUnresolvedSlot() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), new CompletableFuture(), new SlotRequestId());
        AbstractID abstractID = new AbstractID();
        SlotSharingManager.MultiTaskSlot unresolvedRootSlot = createTestingSlotSharingManager.getUnresolvedRootSlot(abstractID);
        Assert.assertNotNull(unresolvedRootSlot);
        Assert.assertEquals(createRootSlot.getSlotRequestId(), unresolvedRootSlot.getSlotRequestId());
        unresolvedRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, abstractID, Locality.UNKNOWN);
        Assert.assertNull(createTestingSlotSharingManager.getUnresolvedRootSlot(abstractID));
    }

    @Test
    public void testResourceCalculationOnSlotAllocatingAndReleasing() {
        ResourceProfile build = ResourceProfile.newBuilder().setCpuCores(1.0d).setTaskHeapMemoryMB(100).setTaskOffHeapMemoryMB(100).setManagedMemoryMB(100).setNetworkMemoryMB(100).build();
        ResourceProfile build2 = ResourceProfile.newBuilder().setCpuCores(2.0d).setTaskHeapMemoryMB(200).setTaskOffHeapMemoryMB(200).setManagedMemoryMB(200).setNetworkMemoryMB(200).addExtendedResource("gpu", new GPUResource(2.0d)).build();
        ResourceProfile build3 = ResourceProfile.newBuilder().setCpuCores(3.0d).setTaskHeapMemoryMB(300).setTaskOffHeapMemoryMB(300).setManagedMemoryMB(300).setNetworkMemoryMB(300).addExtendedResource("gpu", new GPUResource(3.0d)).build();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager().createRootSlot(new SlotRequestId(), new CompletableFuture(), new SlotRequestId());
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot = createRootSlot.allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId());
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = allocateMultiTaskSlot.allocateSingleTaskSlot(new SlotRequestId(), build, new SlotSharingGroupId(), Locality.LOCAL);
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot2 = allocateMultiTaskSlot.allocateSingleTaskSlot(new SlotRequestId(), build2, new SlotSharingGroupId(), Locality.LOCAL);
        Assert.assertEquals(build, allocateSingleTaskSlot.getReservedResources());
        Assert.assertEquals(build2, allocateSingleTaskSlot2.getReservedResources());
        Assert.assertEquals(build.merge(build2), allocateMultiTaskSlot.getReservedResources());
        Assert.assertEquals(build.merge(build2), createRootSlot.getReservedResources());
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot3 = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), build3, new SlotSharingGroupId(), Locality.LOCAL);
        Assert.assertEquals(build3, allocateSingleTaskSlot3.getReservedResources());
        Assert.assertEquals(build.merge(build2).merge(build3), createRootSlot.getReservedResources());
        allocateSingleTaskSlot2.release(new Throwable("Release for testing"));
        Assert.assertEquals(build, allocateMultiTaskSlot.getReservedResources());
        Assert.assertEquals(build.merge(build3), createRootSlot.getReservedResources());
        allocateSingleTaskSlot3.release(new Throwable("Release for testing"));
        Assert.assertEquals(build, createRootSlot.getReservedResources());
        allocateSingleTaskSlot.release(new Throwable("Release for testing"));
        Assert.assertEquals(ResourceProfile.ZERO, createRootSlot.getReservedResources());
    }

    @Test
    public void testGetResolvedSlotWithResourceConfigured() {
        ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 100);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(2.0d, 200);
        ResourceProfile fromResources3 = ResourceProfile.fromResources(5.0d, 500);
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), CompletableFuture.completedFuture(new SimpleSlotContext(new AllocationID(), new LocalTaskManagerLocation(), 0, new SimpleAckingTaskManagerGateway(), fromResources3)), new SlotRequestId());
        createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), fromResources, new SlotSharingGroupId(), Locality.LOCAL);
        Collection listResolvedRootSlotInfo = createTestingSlotSharingManager.listResolvedRootSlotInfo(new AbstractID());
        Assert.assertEquals(1L, listResolvedRootSlotInfo.size());
        Assert.assertEquals(fromResources3.subtract(fromResources), ((SlotSelectionStrategy.SlotInfoAndResources) listResolvedRootSlotInfo.iterator().next()).getRemainingResources());
        createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), fromResources2, new SlotSharingGroupId(), Locality.LOCAL);
        Collection listResolvedRootSlotInfo2 = createTestingSlotSharingManager.listResolvedRootSlotInfo(new AbstractID());
        Assert.assertEquals(1L, listResolvedRootSlotInfo2.size());
        Assert.assertEquals(fromResources3.subtract(fromResources).subtract(fromResources2), ((SlotSelectionStrategy.SlotInfoAndResources) listResolvedRootSlotInfo2.iterator().next()).getRemainingResources());
    }

    @Test
    public void testHashEnoughResourceOfMultiTaskSlot() {
        ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 100);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(2.0d, 200);
        ResourceProfile fromResources3 = ResourceProfile.fromResources(2.0d, 200);
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        CompletableFuture completableFuture = new CompletableFuture();
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), completableFuture, new SlotRequestId()).allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId());
        allocateMultiTaskSlot.allocateSingleTaskSlot(new SlotRequestId(), fromResources, new SlotSharingGroupId(), Locality.LOCAL);
        Assert.assertThat(Boolean.valueOf(allocateMultiTaskSlot.mayHaveEnoughResourcesToFulfill(fromResources)), Is.is(true));
        Assert.assertThat(Boolean.valueOf(allocateMultiTaskSlot.mayHaveEnoughResourcesToFulfill(fromResources2)), Is.is(true));
        Assert.assertThat(Boolean.valueOf(allocateMultiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN)), Is.is(true));
        completableFuture.complete(new AllocatedSlot(new AllocationID(), new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46), 0, fromResources3, (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class)));
        Assert.assertThat(Boolean.valueOf(allocateMultiTaskSlot.mayHaveEnoughResourcesToFulfill(fromResources)), Is.is(true));
        Assert.assertThat(Boolean.valueOf(allocateMultiTaskSlot.mayHaveEnoughResourcesToFulfill(fromResources2)), Is.is(false));
        Assert.assertThat(Boolean.valueOf(allocateMultiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN)), Is.is(true));
    }

    @Test
    public void testSlotAllocatedWithEnoughResource() {
        SlotSharingResourceTestContext createResourceTestContext = createResourceTestContext(ResourceProfile.fromResources(16.0d, 1600));
        for (SlotSharingManager.SingleTaskSlot singleTaskSlot : createResourceTestContext.singleTaskSlotsInOrder) {
            Assert.assertThat(Boolean.valueOf(singleTaskSlot.getLogicalSlotFuture().isDone()), Is.is(true));
            Assert.assertThat(Boolean.valueOf(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally()), Is.is(false));
        }
        Assert.assertThat(createResourceTestContext.slotSharingManager.getTaskSlot(createResourceTestContext.coLocationTaskSlot.getSlotRequestId()), CoreMatchers.notNullValue());
    }

    @Test
    public void testSlotOverAllocatedAndTaskSlotsReleased() {
        SlotSharingResourceTestContext createResourceTestContext = createResourceTestContext(ResourceProfile.fromResources(7.0d, 700));
        for (int i = 0; i < createResourceTestContext.singleTaskSlotsInOrder.size(); i++) {
            SlotSharingManager.SingleTaskSlot singleTaskSlot = createResourceTestContext.singleTaskSlotsInOrder.get(i);
            Assert.assertThat(Boolean.valueOf(singleTaskSlot.getLogicalSlotFuture().isDone()), Is.is(true));
            Assert.assertThat(Boolean.valueOf(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally()), Is.is(true));
            singleTaskSlot.getLogicalSlotFuture().whenComplete((logicalSlot, th) -> {
                Assert.assertThat(th, Matchers.instanceOf(IllegalStateException.class));
            });
        }
        Assert.assertThat(Boolean.valueOf(createResourceTestContext.slotSharingManager.isEmpty()), Is.is(true));
    }

    private SlotSharingResourceTestContext createResourceTestContext(ResourceProfile resourceProfile) {
        ResourceProfile fromResources = ResourceProfile.fromResources(2.0d, 200);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(3.0d, 300);
        ResourceProfile fromResources3 = ResourceProfile.fromResources(9.0d, 900);
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        CompletableFuture completableFuture = new CompletableFuture();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), completableFuture, new SlotRequestId());
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot = createRootSlot.allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId());
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot = allocateMultiTaskSlot.allocateSingleTaskSlot(new SlotRequestId(), fromResources, new SlotSharingGroupId(), Locality.LOCAL);
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot2 = allocateMultiTaskSlot.allocateSingleTaskSlot(new SlotRequestId(), fromResources, new SlotSharingGroupId(), Locality.LOCAL);
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot3 = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), fromResources2, new SlotSharingGroupId(), Locality.LOCAL);
        SlotSharingManager.SingleTaskSlot allocateSingleTaskSlot4 = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), fromResources3, new SlotSharingGroupId(), Locality.LOCAL);
        completableFuture.complete(new AllocatedSlot(new AllocationID(), new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46), 0, resourceProfile, (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class)));
        return new SlotSharingResourceTestContext(createTestingSlotSharingManager, allocateMultiTaskSlot, Arrays.asList(allocateSingleTaskSlot, allocateSingleTaskSlot2, allocateSingleTaskSlot3, allocateSingleTaskSlot4));
    }

    private SlotSharingManager createTestingSlotSharingManager() {
        return new SlotSharingManager(SLOT_SHARING_GROUP_ID, new TestingAllocatedSlotActions(), SLOT_OWNER);
    }

    @Test
    public void testTaskExecutorUtilizationCalculation() {
        TestingAllocatedSlotActions testingAllocatedSlotActions = new TestingAllocatedSlotActions();
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        LocalTaskManagerLocation localTaskManagerLocation2 = new LocalTaskManagerLocation();
        SlotSharingManager slotSharingManager = new SlotSharingManager(SLOT_SHARING_GROUP_ID, testingAllocatedSlotActions, SLOT_OWNER);
        SlotSharingManager.MultiTaskSlot createRootSlot = createRootSlot(localTaskManagerLocation, slotSharingManager);
        createRootSlot(localTaskManagerLocation, slotSharingManager);
        createRootSlot(localTaskManagerLocation2, slotSharingManager);
        AbstractID abstractID = new AbstractID();
        createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, abstractID, Locality.UNCONSTRAINED);
        Collection listResolvedRootSlotInfo = slotSharingManager.listResolvedRootSlotInfo(abstractID);
        Assert.assertThat(listResolvedRootSlotInfo, Matchers.hasSize(2));
        Map map = (Map) listResolvedRootSlotInfo.stream().collect(Collectors.toMap(slotInfoAndResources -> {
            return slotInfoAndResources.getSlotInfo().getTaskManagerLocation();
        }, (v0) -> {
            return v0.getTaskExecutorUtilization();
        }));
        Assert.assertThat(map.get(localTaskManagerLocation), Is.is(Matchers.closeTo(0.5d, 0.1d)));
        Assert.assertThat(map.get(localTaskManagerLocation2), Is.is(Matchers.closeTo(0.0d, 0.1d)));
    }

    @Test
    public void shouldResolveRootSlotBeforeCompletingChildSlots() {
        SlotSharingManager createTestingSlotSharingManager = createTestingSlotSharingManager();
        CompletableFuture completableFuture = new CompletableFuture();
        SlotSharingManager.MultiTaskSlot createRootSlot = createTestingSlotSharingManager.createRootSlot(new SlotRequestId(), completableFuture.thenApply(Function.identity()), new SlotRequestId());
        CompletableFuture<Void> thenRun = createRootSlot.allocateSingleTaskSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, new AbstractID(), Locality.UNCONSTRAINED).getLogicalSlotFuture().thenRun(() -> {
            Assert.assertThat(createTestingSlotSharingManager.getResolvedRootSlots(), Matchers.contains(new SlotSharingManager.MultiTaskSlot[]{createRootSlot}));
        });
        completableFuture.complete(createSimpleSlotContext());
        thenRun.join();
    }

    private SlotSharingManager.MultiTaskSlot createRootSlot(TaskManagerLocation taskManagerLocation, SlotSharingManager slotSharingManager) {
        return slotSharingManager.createRootSlot(new SlotRequestId(), CompletableFuture.completedFuture(new SimpleSlotContext(new AllocationID(), taskManagerLocation, 0, new SimpleAckingTaskManagerGateway())), new SlotRequestId());
    }
}
