/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AbstractDeclarativeSlotPoolBridgeTest;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.QuadFunction;
import org.apache.flink.util.function.TriFunction;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class DeclarativeSlotPoolBridgeTest
extends AbstractDeclarativeSlotPoolBridgeTest {
    DeclarativeSlotPoolBridgeTest() {
    }

    @TestTemplate
    void testSlotOffer() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        AllocationID expectedAllocationId = new AllocationID();
        PhysicalSlot allocatedSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(expectedAllocationId);
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setGetFreeSlotTrackerSupplier(() -> TestingFreeSlotTracker.newBuilder().setGetFreeSlotsInformationSupplier(() -> Collections.singleton(allocatedSlot)).setGetAvailableSlotsSupplier(() -> Collections.singleton(allocatedSlot.getAllocationId())).build()));
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = this.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);){
            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture slotAllocationFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, null);
            this.tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
            declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(allocatedSlot));
            slotAllocationFuture.join();
        }
    }

    @TestTemplate
    void testNotEnoughResourcesAvailableFailsPendingRequests() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = this.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);){
            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture slotAllocationFuture = CompletableFuture.supplyAsync(() -> declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, Duration.ofMinutes(5L)), (Executor)this.componentMainThreadExecutor).get();
            this.tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
            this.componentMainThreadExecutor.execute(() -> declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(Collections.emptyList()));
            FlinkAssertions.assertThatFuture((CompletableFuture)slotAllocationFuture).failsWithin(Duration.ofSeconds(10L));
        }
    }

    @TestTemplate
    void testReleasingAllocatedSlot() throws Exception {
        CompletableFuture releaseSlotFuture = new CompletableFuture();
        AllocationID expectedAllocationId = new AllocationID();
        PhysicalSlot allocatedSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(expectedAllocationId);
        TestingDeclarativeSlotPoolBuilder builder = TestingDeclarativeSlotPool.builder().setReserveFreeSlotFunction((allocationId, resourceProfile) -> {
            Assertions.assertThat((Comparable)allocationId).isSameAs((Object)expectedAllocationId);
            return allocatedSlot;
        }).setFreeReservedSlotFunction((TriFunction<AllocationID, Throwable, Long, ResourceCounter>)((TriFunction)(allocationID, throwable, aLong) -> {
            releaseSlotFuture.complete(allocationID);
            return ResourceCounter.empty();
        }));
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(builder);
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = this.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);){
            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            SlotRequestId slotRequestId = new SlotRequestId();
            declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, expectedAllocationId, allocatedSlot.getResourceProfile());
            this.tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
            declarativeSlotPoolBridge.releaseSlot(slotRequestId, null);
            Assertions.assertThat((Comparable)((AllocationID)releaseSlotFuture.join())).isSameAs((Object)expectedAllocationId);
        }
    }

    @TestTemplate
    void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception {
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = this.createDeclarativeSlotPoolBridge((DeclarativeSlotPoolFactory)new DefaultDeclarativeSlotPoolFactory());){
            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            List<SlotRequestId> slotRequestIds = Arrays.asList(new SlotRequestId(), new SlotRequestId());
            List slotFutures = slotRequestIds.stream().map(slotRequestId -> {
                CompletableFuture slotFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, RPC_TIMEOUT);
                slotFuture.whenComplete((physicalSlot, throwable) -> {
                    if (throwable != null) {
                        declarativeSlotPoolBridge.releaseSlot(slotRequestId, throwable);
                    }
                });
                return slotFuture;
            }).collect(Collectors.toList());
            this.tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
            declarativeSlotPoolBridge.close();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> FutureUtils.waitForAll((Collection)slotFutures).get()).as("The slot futures should be completed exceptionally.", new Object[0])).isInstanceOf(ExecutionException.class);
        }
    }

    @TestTemplate
    void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception {
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = this.createDeclarativeSlotPoolBridge((DeclarativeSlotPoolFactory)new DefaultDeclarativeSlotPoolFactory());){
            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture slotFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, RPC_TIMEOUT);
            this.tryWaitSlotRequestIsDone(declarativeSlotPoolBridge);
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            declarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
            AllocationID allocationId = new AllocationID();
            declarativeSlotPoolBridge.offerSlots((TaskManagerLocation)localTaskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), Collections.singleton(new SlotOffer(allocationId, 0, ResourceProfile.ANY)));
            Assertions.assertThat((Comparable)((PhysicalSlot)slotFuture.join()).getAllocationId()).isSameAs((Object)allocationId);
        }
    }

    @TestTemplate
    void testIfJobIsRestartingAllOfferedSlotsWillBeRegistered() throws Exception {
        CompletableFuture registerSlotsCalledFuture = new CompletableFuture();
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setRegisterSlotsFunction((QuadFunction<Collection<? extends SlotOffer>, TaskManagerLocation, TaskManagerGateway, Long, Collection<SlotOffer>>)((QuadFunction)(slotOffers, taskManagerLocation, taskManagerGateway, aLong) -> {
            registerSlotsCalledFuture.complete(null);
            return new ArrayList(slotOffers);
        })));
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = this.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);){
            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            declarativeSlotPoolBridge.setIsJobRestarting(true);
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            declarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
            declarativeSlotPoolBridge.offerSlots((TaskManagerLocation)localTaskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY)));
            registerSlotsCalledFuture.join();
        }
    }

    @TestTemplate
    void testSlotsBatchAllocatableLogic() throws Exception {
        this.testSlotsBatchAllocatableLogic(1);
        this.testSlotsBatchAllocatableLogic(2);
        this.testSlotsBatchAllocatableLogic(4);
        this.testSlotsBatchAllocatableLogic(7);
        this.testSlotsBatchAllocatableLogic(10);
        this.testSlotsBatchAllocatableLogic(32);
    }

    private void testSlotsBatchAllocatableLogic(int requestSlotNum) throws Exception {
        HashSet<AllocationID> availableSlotsIds = new HashSet<AllocationID>();
        HashSet<PhysicalSlot> freeSlotsInformation = new HashSet<PhysicalSlot>();
        try (DeclarativeSlotPoolBridge slotPoolBridge = this.createDeclarativeSlotPoolBridge(freeSlotsInformation, availableSlotsIds);){
            int i;
            slotPoolBridge.start(JOB_MASTER_ID, "localhost");
            ArrayList<CompletableFuture<PhysicalSlot>> futures = new ArrayList<CompletableFuture<PhysicalSlot>>(requestSlotNum);
            for (i = 0; i < requestSlotNum; ++i) {
                futures.add(slotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, null));
            }
            this.tryWaitSlotRequestIsDone(slotPoolBridge);
            for (i = 0; i < requestSlotNum; ++i) {
                PhysicalSlot slot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
                this.newSlotsAreAvailable(slotPoolBridge, freeSlotsInformation, availableSlotsIds, slot);
                if (this.slotBatchAllocatable) {
                    this.checkForSlotBatchAllocating(requestSlotNum, i, futures);
                    continue;
                }
                ((CompletableFuture)futures.get(i)).join();
            }
        }
    }

    private void checkForSlotBatchAllocating(int requestSlotNum, int requestIndex, List<CompletableFuture<PhysicalSlot>> futures) {
        if (requestIndex < requestSlotNum - 1) {
            Assertions.assertThat((int)FutureUtils.waitForAll(futures).getNumFuturesCompleted()).isZero();
        } else {
            FutureUtils.waitForAll(futures).join();
        }
    }

    private void newSlotsAreAvailable(DeclarativeSlotPoolBridge declarativeSlotPoolBridge, Set<PhysicalSlot> freeSlotsInformation, Set<AllocationID> availableSlotsIds, PhysicalSlot slot) {
        freeSlotsInformation.add(slot);
        availableSlotsIds.add(slot.getAllocationId());
        declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(slot));
    }

    private void tryWaitSlotRequestIsDone(DeclarativeSlotPoolBridge declarativeSlotPoolBridge) {
        if (declarativeSlotPoolBridge.getDeclarativeSlotPool() instanceof DefaultDeclarativeSlotPool) {
            DefaultDeclarativeSlotPool slotPool = (DefaultDeclarativeSlotPool)declarativeSlotPoolBridge.getDeclarativeSlotPool();
            slotPool.tryWaitSlotRequestIsDone();
        }
    }

    private DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(Set<PhysicalSlot> freeSlotsInformation, Set<AllocationID> availableSlotsIds) {
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setGetFreeSlotTrackerSupplier(() -> TestingFreeSlotTracker.newBuilder().setGetFreeSlotsInformationSupplier(() -> freeSlotsInformation).setGetAvailableSlotsSupplier(() -> availableSlotsIds).build()));
        return this.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);
    }
}

