/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SlotPoolImplTest
extends TestLogger {
    private final Time timeout = Time.seconds((long)10L);
    private JobID jobId;
    private TaskManagerLocation taskManagerLocation;
    private SimpleAckingTaskManagerGateway taskManagerGateway;
    private TestingResourceManagerGateway resourceManagerGateway;
    private ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

    @Before
    public void setUp() throws Exception {
        this.jobId = new JobID();
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @Test
    public void testAllocateSimpleSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            SlotRequestId requestId = new SlotRequestId();
            CompletableFuture future = scheduler.allocateSlot(requestId, (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            Assert.assertFalse((boolean)future.isDone());
            SlotRequest slotRequest = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            LogicalSlot slot = (LogicalSlot)future.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future.isDone());
            Assert.assertTrue((boolean)slot.isAlive());
            Assert.assertEquals((Object)this.taskManagerLocation, (Object)slot.getTaskManagerLocation());
        }
    }

    @Nonnull
    private SlotPoolImpl createSlotPoolImpl() {
        return new TestingSlotPoolImpl(this.jobId);
    }

    @Test
    public void testAllocationFulfilledByReturnedSlot() throws Exception {
        ArrayBlockingQueue slotRequestQueue = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            while (!slotRequestQueue.offer(slotRequest)) {
            }
        });
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future1 = scheduler.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            CompletableFuture future2 = scheduler.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            Assert.assertFalse((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            ArrayList slotRequests = new ArrayList(2);
            for (int i = 0; i < 2; ++i) {
                slotRequests.add(slotRequestQueue.poll(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            }
            SlotOffer slotOffer = new SlotOffer(((SlotRequest)slotRequests.get(0)).getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            LogicalSlot slot1 = (LogicalSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            slot1.releaseSlot();
            LogicalSlot slot2 = (LogicalSlot)future2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future2.isDone());
            Assert.assertNotEquals((Object)slot1, (Object)slot2);
            Assert.assertFalse((boolean)slot1.isAlive());
            Assert.assertTrue((boolean)slot2.isAlive());
            Assert.assertEquals((Object)slot1.getTaskManagerLocation(), (Object)slot2.getTaskManagerLocation());
            Assert.assertEquals((long)slot1.getPhysicalSlotNumber(), (long)slot2.getPhysicalSlotNumber());
            Assert.assertEquals((Object)slot1.getAllocationId(), (Object)slot2.getAllocationId());
        }
    }

    @Test
    public void testAllocateWithFreeSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future1 = scheduler.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            Assert.assertFalse((boolean)future1.isDone());
            SlotRequest slotRequest = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            LogicalSlot slot1 = (LogicalSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            slot1.releaseSlot();
            CompletableFuture future2 = scheduler.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            LogicalSlot slot2 = (LogicalSlot)future2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future2.isDone());
            Assert.assertNotEquals((Object)slot1, (Object)slot2);
            Assert.assertFalse((boolean)slot1.isAlive());
            Assert.assertTrue((boolean)slot2.isAlive());
            Assert.assertEquals((Object)slot1.getTaskManagerLocation(), (Object)slot2.getTaskManagerLocation());
            Assert.assertEquals((long)slot1.getPhysicalSlotNumber(), (long)slot2.getPhysicalSlotNumber());
        }
    }

    @Test
    public void testOfferSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future = scheduler.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            Assert.assertFalse((boolean)future.isDone());
            SlotRequest slotRequest = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            LocalTaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
            Assert.assertFalse((boolean)slotPool.offerSlot((TaskManagerLocation)invalidTaskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            SlotOffer nonRequestedSlotOffer = new SlotOffer(new AllocationID(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, nonRequestedSlotOffer));
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            LogicalSlot slot = (LogicalSlot)future.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)slot.isAlive());
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            Assert.assertTrue((boolean)slot.isAlive());
            SlotOffer anotherSlotOfferWithSameAllocationId = new SlotOffer(slotRequest.getAllocationId(), 1, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertFalse((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, anotherSlotOfferWithSameAllocationId));
            LocalTaskManagerLocation anotherTaskManagerLocation = new LocalTaskManagerLocation();
            Assert.assertFalse((boolean)slotPool.offerSlot((TaskManagerLocation)anotherTaskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            slot.releaseSlot();
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            Assert.assertFalse((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, anotherSlotOfferWithSameAllocationId));
            Assert.assertFalse((boolean)slotPool.offerSlot((TaskManagerLocation)anotherTaskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
        }
    }

    @Test
    public void testReleaseResource() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future1 = scheduler.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            SlotRequest slotRequest = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture future2 = scheduler.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), this.timeout);
            SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            LogicalSlot slot1 = (LogicalSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            CompletableFuture releaseFuture = new CompletableFuture();
            DummyPayload dummyPayload = new DummyPayload(releaseFuture);
            slot1.tryAssignPayload((LogicalSlot.Payload)dummyPayload);
            slotPool.releaseTaskManager(this.taskManagerLocation.getResourceID(), null);
            releaseFuture.get();
            Assert.assertFalse((boolean)slot1.isAlive());
            Thread.sleep(10L);
            Assert.assertFalse((boolean)future2.isDone());
        }
    }

    @Test
    public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(2);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
            ArrayBlockingQueue canceledAllocations = new ArrayBlockingQueue(2);
            this.resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
            SlotRequestId slotRequestId1 = new SlotRequestId();
            SlotRequestId slotRequestId2 = new SlotRequestId();
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            ScheduledUnit scheduledUnit = new ScheduledUnit(new JobVertexID(), null, null);
            CompletableFuture slotFuture1 = scheduler.allocateSlot(slotRequestId1, scheduledUnit, SlotProfile.noRequirements(), this.timeout);
            AllocationID allocationId1 = (AllocationID)allocationIds.take();
            CompletableFuture slotFuture2 = scheduler.allocateSlot(slotRequestId2, scheduledUnit, SlotProfile.noRequirements(), this.timeout);
            AllocationID allocationId2 = (AllocationID)allocationIds.take();
            slotPool.releaseSlot(slotRequestId1, null);
            try {
                slotFuture1.get();
                Assert.fail((String)"The first slot future should have failed because it was cancelled.");
            }
            catch (ExecutionException ee) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)ee) instanceof FlinkException));
            }
            Assert.assertEquals((Object)allocationId1, canceledAllocations.take());
            SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.ANY);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer));
            Assert.assertEquals((Object)allocationId1, (Object)((LogicalSlot)slotFuture2.get()).getAllocationId());
            Assert.assertEquals((Object)allocationId2, canceledAllocations.take());
        }
    }

    @Test
    public void testShutdownReleasesAllSlots() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            int numSlotOffers = 2;
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(2);
            for (int i = 0; i < 2; ++i) {
                slotOffers.add(new SlotOffer(new AllocationID(), i, ResourceProfile.ANY));
            }
            ArrayBlockingQueue freedSlotQueue = new ArrayBlockingQueue(2);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, cause) -> {
                try {
                    freedSlotQueue.put(allocationID);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            Collection acceptedSlotOffers = slotPool.offerSlots(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffers);
            MatcherAssert.assertThat((Object)acceptedSlotOffers, (Matcher)Matchers.equalTo(slotOffers));
            slotPool.close();
            ArrayList freedSlots = new ArrayList(2);
            while (freedSlots.size() < 2) {
                freedSlotQueue.drainTo(freedSlots);
            }
            MatcherAssert.assertThat(freedSlots, (Matcher)Matchers.containsInAnyOrder((Object[])slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
        }
    }

    @Test
    public void testCheckIdleSlot() throws Exception {
        ManualClock clock = new ManualClock();
        try (TestingSlotPoolImpl slotPool = this.createSlotPoolImpl(clock);){
            ArrayBlockingQueue freedSlots = new ArrayBlockingQueue(1);
            this.taskManagerGateway.setFreeSlotFunction((allocationId, cause) -> {
                try {
                    freedSlots.put(allocationId);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            AllocationID expiredSlotID = new AllocationID();
            AllocationID freshSlotID = new AllocationID();
            SlotOffer slotToExpire = new SlotOffer(expiredSlotID, 0, ResourceProfile.ANY);
            SlotOffer slotToNotExpire = new SlotOffer(freshSlotID, 1, ResourceProfile.ANY);
            MatcherAssert.assertThat((Object)slotPool.registerTaskManager(this.taskManagerLocation.getResourceID()), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotToExpire), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotToNotExpire), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(1L, TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            AllocationID freedSlot = (AllocationID)freedSlots.poll(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)freedSlot, (Matcher)Matchers.is((Object)expiredSlotID));
            MatcherAssert.assertThat((Object)freedSlots.isEmpty(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Nonnull
    private TestingSlotPoolImpl createSlotPoolImpl(ManualClock clock) {
        return new TestingSlotPoolImpl(this.jobId, (Clock)clock, TestingUtils.infiniteTime(), this.timeout, TestingUtils.infiniteTime());
    }

    @Test
    public void testDiscardIdleSlotIfReleasingFailed() throws Exception {
        ManualClock clock = new ManualClock();
        try (TestingSlotPoolImpl slotPool = this.createSlotPoolImpl(clock);){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            AllocationID expiredAllocationId = new AllocationID();
            SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.ANY);
            OneShotLatch freeSlotLatch = new OneShotLatch();
            this.taskManagerGateway.setFreeSlotFunction((allocationId, cause) -> {
                freeSlotLatch.trigger();
                return FutureUtils.completedExceptionally((Throwable)new TimeoutException("Test failure"));
            });
            MatcherAssert.assertThat((Object)slotPool.registerTaskManager(this.taskManagerLocation.getResourceID()), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotToExpire), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(this.timeout.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            freeSlotLatch.await();
            CompletableFuture<LogicalSlot> allocatedSlotFuture = this.allocateSlot(scheduler, new SlotRequestId());
            try {
                allocatedSlotFuture.get(10L, TimeUnit.MILLISECONDS);
                Assert.fail((String)"Expected to fail with a timeout.");
            }
            catch (TimeoutException ignored) {
                Assert.assertEquals((long)0L, (long)slotPool.getAvailableSlots().size());
            }
        }
    }

    @Test
    public void testFreeFailedSlots() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            int parallelism = 5;
            ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(5);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            HashMap<SlotRequestId, CompletableFuture<LogicalSlot>> slotRequestFutures = new HashMap<SlotRequestId, CompletableFuture<LogicalSlot>>(5);
            for (int i = 0; i < 5; ++i) {
                SlotRequestId slotRequestId = new SlotRequestId();
                slotRequestFutures.put(slotRequestId, this.allocateSlot(scheduler, slotRequestId));
            }
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(5);
            for (int i = 0; i < 5; ++i) {
                slotOffers.add(new SlotOffer((AllocationID)allocationIds.take(), i, ResourceProfile.ANY));
            }
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            slotPool.offerSlots(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffers);
            FutureUtils.waitForAll(slotRequestFutures.values()).get();
            ArrayBlockingQueue freedSlots = new ArrayBlockingQueue(1);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, throwable) -> {
                freedSlots.offer(allocationID);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            FlinkException failException = new FlinkException("Test fail exception");
            for (int i = 0; i < 4; ++i) {
                SlotOffer slotOffer = (SlotOffer)slotOffers.get(i);
                Optional emptyTaskExecutorFuture = slotPool.failAllocation(slotOffer.getAllocationId(), (Exception)failException);
                MatcherAssert.assertThat((Object)emptyTaskExecutorFuture.isPresent(), (Matcher)Matchers.is((Object)false));
                MatcherAssert.assertThat(freedSlots.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)slotOffer.getAllocationId())));
            }
            SlotOffer slotOffer = (SlotOffer)slotOffers.get(4);
            Optional emptyTaskExecutorFuture = slotPool.failAllocation(slotOffer.getAllocationId(), (Exception)failException);
            MatcherAssert.assertThat(emptyTaskExecutorFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID())));
            MatcherAssert.assertThat(freedSlots.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)slotOffer.getAllocationId())));
        }
    }

    @Test
    public void testCreateAllocatedSlotReport() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(1);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            Scheduler scheduler = SlotPoolImplTest.setupScheduler((SlotPool)slotPool, this.mainThreadExecutor);
            SlotRequestId slotRequestId = new SlotRequestId();
            CompletableFuture<LogicalSlot> slotRequestFuture = this.allocateSlot(scheduler, slotRequestId);
            ArrayList<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<AllocatedSlotInfo>(2);
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(2);
            AllocationID allocatedId = (AllocationID)allocationIds.take();
            slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.ANY));
            allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));
            AllocationID availableId = new AllocationID();
            slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.ANY));
            allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            slotPool.offerSlots(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffers);
            slotRequestFuture.get();
            AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(this.taskManagerLocation.getResourceID());
            MatcherAssert.assertThat((Object)this.jobId, (Matcher)Matchers.is((Object)slotReport.getJobId()));
            MatcherAssert.assertThat((Object)slotReport.getAllocatedSlotInfos(), (Matcher)Matchers.containsInAnyOrder(SlotPoolImplTest.isEachEqual(allocatedSlotInfos)));
        }
    }

    @Test
    public void testCalculationOfTaskExecutorUtilization() throws Exception {
        try (SlotPoolImpl slotPool = this.createSlotPoolImpl();){
            SlotPoolImplTest.setupSlotPool(slotPool, this.resourceManagerGateway, this.mainThreadExecutor);
            LocalTaskManagerLocation firstTaskManagerLocation = new LocalTaskManagerLocation();
            LocalTaskManagerLocation secondTaskManagerLocation = new LocalTaskManagerLocation();
            List<AllocationID> firstTaskManagersSlots = this.registerAndOfferSlots(firstTaskManagerLocation, slotPool, 4);
            List<AllocationID> secondTaskManagersSlots = this.registerAndOfferSlots(secondTaskManagerLocation, slotPool, 4);
            slotPool.allocateAvailableSlot(new SlotRequestId(), firstTaskManagersSlots.get(0));
            slotPool.allocateAvailableSlot(new SlotRequestId(), firstTaskManagersSlots.get(1));
            slotPool.allocateAvailableSlot(new SlotRequestId(), secondTaskManagersSlots.get(3));
            Collection availableSlotsInformation = slotPool.getAvailableSlotsInformation();
            ImmutableMap utilizationPerTaskExecutor = ImmutableMap.of((Object)((Object)firstTaskManagerLocation), (Object)0.5, (Object)((Object)secondTaskManagerLocation), (Object)0.25);
            for (SlotInfoWithUtilization slotInfoWithUtilization : availableSlotsInformation) {
                double expectedTaskExecutorUtilization = (Double)utilizationPerTaskExecutor.get(slotInfoWithUtilization.getTaskManagerLocation());
                MatcherAssert.assertThat((Object)slotInfoWithUtilization.getTaskExecutorUtilization(), (Matcher)Matchers.is((Matcher)Matchers.closeTo((double)expectedTaskExecutorUtilization, (double)0.1)));
            }
        }
    }

    private List<AllocationID> registerAndOfferSlots(TaskManagerLocation taskManagerLocation, SlotPoolImpl slotPool, int numberOfSlotsToRegister) {
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        List<AllocationID> allocationIds = IntStream.range(0, numberOfSlotsToRegister).mapToObj(ignored -> new AllocationID()).collect(Collectors.toList());
        Collection slotOffers = IntStream.range(0, numberOfSlotsToRegister).mapToObj(index -> new SlotOffer((AllocationID)allocationIds.get(index), index, ResourceProfile.ANY)).collect(Collectors.toList());
        slotPool.offerSlots(taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), slotOffers);
        return allocationIds;
    }

    private static Collection<Matcher<? super AllocatedSlotInfo>> isEachEqual(Collection<AllocatedSlotInfo> allocatedSlotInfos) {
        return allocatedSlotInfos.stream().map(SlotPoolImplTest::isEqualAllocatedSlotInfo).collect(Collectors.toList());
    }

    private static Matcher<AllocatedSlotInfo> isEqualAllocatedSlotInfo(final AllocatedSlotInfo expectedAllocatedSlotInfo) {
        return new TypeSafeDiagnosingMatcher<AllocatedSlotInfo>(){

            public void describeTo(Description description) {
                description.appendText(this.describeAllocatedSlotInformation(expectedAllocatedSlotInfo));
            }

            private String describeAllocatedSlotInformation(AllocatedSlotInfo expectedAllocatedSlotInformation) {
                return expectedAllocatedSlotInformation.toString();
            }

            protected boolean matchesSafely(AllocatedSlotInfo item, Description mismatchDescription) {
                boolean matches;
                boolean bl = matches = item.getAllocationId().equals((Object)expectedAllocatedSlotInfo.getAllocationId()) && item.getSlotIndex() == expectedAllocatedSlotInfo.getSlotIndex();
                if (!matches) {
                    mismatchDescription.appendText("Actual value ").appendText(this.describeAllocatedSlotInformation(item)).appendText(" differs from expected value ").appendText(this.describeAllocatedSlotInformation(expectedAllocatedSlotInfo));
                }
                return matches;
            }
        };
    }

    private CompletableFuture<LogicalSlot> allocateSlot(Scheduler scheduler, SlotRequestId slotRequestId) {
        return scheduler.allocateSlot(slotRequestId, (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noRequirements(), this.timeout);
    }

    private static void setupSlotPool(SlotPoolImpl slotPool, ResourceManagerGateway resourceManagerGateway, ComponentMainThreadExecutor mainThreadExecutable) throws Exception {
        String jobManagerAddress = "foobar";
        slotPool.start(JobMasterId.generate(), "foobar", mainThreadExecutable);
        slotPool.connectToResourceManager(resourceManagerGateway);
    }

    private static Scheduler setupScheduler(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutable) {
        SchedulerImpl scheduler = new SchedulerImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        scheduler.start(mainThreadExecutable);
        return scheduler;
    }
}

