/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestUtils;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.CheckedSupplier;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class DeclarativeSlotPoolBridgeRequestCompletionTest {
    private static final Duration TIMEOUT = SlotPoolUtils.TIMEOUT;
    private TestingResourceManagerGateway resourceManagerGateway;
    @Parameter
    private boolean deferSlotAllocation;

    DeclarativeSlotPoolBridgeRequestCompletionTest() {
    }

    @Parameters(name="deferSlotAllocation: {0}")
    public static List<Boolean> getDeferSlotAllocationParams() {
        return Lists.newArrayList((Object[])new Boolean[]{false, true});
    }

    @BeforeEach
    void setUp() {
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @TestTemplate
    void testRequestsAreCompletedInRequestOrder() {
        this.runSlotRequestCompletionTest(CheckedSupplier.unchecked(() -> this.createAndSetUpSlotPool(this.deferSlotAllocation)), slotPool -> {});
    }

    @TestTemplate
    void testStashOrderMaintainsRequestOrder() {
        this.runSlotRequestCompletionTest(CheckedSupplier.unchecked(this::createAndSetUpSlotPoolWithoutResourceManager), this::connectToResourceManager);
    }

    private void runSlotRequestCompletionTest(Supplier<SlotPool> slotPoolSupplier, Consumer<SlotPool> actionAfterSlotRequest) {
        try (SlotPool slotPool = slotPoolSupplier.get();){
            int requestNum = 10;
            List slotRequestIds = IntStream.range(0, 10).mapToObj(ignored -> new SlotRequestId()).collect(Collectors.toList());
            List slotRequests = slotRequestIds.stream().map(slotRequestId -> slotPool.requestNewAllocatedSlot(PhysicalSlotRequestUtils.normalRequest(slotRequestId, ResourceProfile.UNKNOWN), TIMEOUT)).collect(Collectors.toList());
            actionAfterSlotRequest.accept(slotPool);
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            slotPool.registerTaskManager(taskManagerLocation.getResourceID());
            SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
            Collection acceptedSlots = slotPool.offerSlots((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), Collections.singleton(slotOffer));
            Assertions.assertThat((Collection)acceptedSlots).contains((Object[])new SlotOffer[]{slotOffer});
            FlinkException testingReleaseException = new FlinkException("Testing release exception");
            DeclarativeSlotPoolBridge slotPoolBridge = (DeclarativeSlotPoolBridge)slotPool;
            for (int i = 0; i < slotRequestIds.size(); ++i) {
                CompletableFuture slotRequestFuture = (CompletableFuture)slotRequests.get(i);
                if (this.deferSlotAllocation) {
                    Assertions.assertThat((Object)slotRequestFuture.getNow(null)).isNull();
                    Assertions.assertThat((Collection)slotPoolBridge.getFreeSlotsInformation()).hasSize(1);
                } else {
                    Assertions.assertThat((Object)slotRequestFuture.getNow(null)).isNotNull();
                }
                slotPool.releaseSlot((SlotRequestId)slotRequestIds.get(i), (Throwable)testingReleaseException);
            }
        }
    }

    private SlotPool createAndSetUpSlotPool(boolean deferSlotAllocation) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setDeferSlotAllocation(deferSlotAllocation).setResourceManagerGateway(this.resourceManagerGateway).buildAndStart();
    }

    private void connectToResourceManager(SlotPool slotPool) {
        slotPool.connectToResourceManager((ResourceManagerGateway)this.resourceManagerGateway);
    }

    private SlotPool createAndSetUpSlotPoolWithoutResourceManager() throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setDeferSlotAllocation(this.deferSlotAllocation).setResourceManagerGateway(null).buildAndStart();
    }
}

