/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class SlotPoolBatchSlotRequestTest {
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)1024);
    public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0];
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    SlotPoolBatchSlotRequestTest() {
    }

    @BeforeAll
    private static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterAll
    private static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    void testPendingBatchSlotRequestTimeout() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, null, Duration.ofMillis(2L));){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(slotFuture::get).withFailMessage("Expected that slot future times out.", new Object[0])).isInstanceOf(ExecutionException.class)).hasRootCauseInstanceOf(TimeoutException.class);
        }
    }

    @Test
    void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() throws Exception {
        Duration batchSlotTimeout = Duration.ofMillis(2L);
        ManualClock clock = new ManualClock();
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, null, batchSlotTimeout, (Clock)clock);){
            SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            SlotPoolUtils.offerSlots((SlotPool)slotPool, mainThreadExecutor, Arrays.asList(resourceProfile));
            CompletableFuture<PhysicalSlot> firstPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            CompletableFuture<PhysicalSlot> secondPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            List<CompletableFuture> slotFutures = Arrays.asList(firstPendingSlotFuture, secondPendingSlotFuture);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, mainThreadExecutor, clock, batchSlotTimeout);
            for (CompletableFuture slotFuture : slotFutures) {
                FlinkAssertions.assertThatFuture((CompletableFuture)slotFuture).isNotDone();
            }
        }
    }

    @Test
    void testPendingBatchSlotRequestDoesNotFailIfResourceDeclaringFails() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setDeclareRequiredResourcesFunction((jobMasterId, resourceRequirements) -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Failed request")));
        Duration batchSlotTimeout = Duration.ofMillis(1000L);
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, testingResourceManagerGateway, batchSlotTimeout);){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            FlinkAssertions.assertThatFuture(slotFuture).willNotCompleteWithin(Duration.ofMillis(50L));
        }
    }

    @Test
    void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exception {
        ManualClock clock = new ManualClock();
        Duration batchSlotTimeout = Duration.ofMillis(10000L);
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, null, batchSlotTimeout, (Clock)clock);){
            SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            ResourceID taskManagerResourceId = SlotPoolUtils.offerSlots((SlotPool)slotPool, mainThreadExecutor, Arrays.asList(resourceProfile));
            CompletableFuture<PhysicalSlot> firstPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            CompletableFuture<PhysicalSlot> secondPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            List<CompletableFuture> slotFutures = Arrays.asList(firstPendingSlotFuture, secondPendingSlotFuture);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, mainThreadExecutor, clock, batchSlotTimeout);
            FlinkAssertions.assertThatFuture(CompletableFuture.anyOf(slotFutures.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY))).isNotDone();
            SlotPoolUtils.releaseTaskManager((SlotPool)slotPool, mainThreadExecutor, taskManagerResourceId);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, mainThreadExecutor, clock, batchSlotTimeout);
            for (CompletableFuture slotFuture : slotFutures) {
                FlinkAssertions.assertThatFuture((CompletableFuture)slotFuture).isCompletedExceptionally();
                ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(slotFuture::get).withFailMessage("Expected that the slot future times out.", new Object[0])).isInstanceOf(ExecutionException.class)).hasRootCauseInstanceOf(TimeoutException.class);
            }
        }
    }

    private void advanceTimeAndTriggerCheckBatchSlotTimeout(DeclarativeSlotPoolBridge slotPool, ComponentMainThreadExecutor componentMainThreadExecutor, ManualClock clock, Duration batchSlotTimeout) {
        this.runBatchSlotTimeoutCheck(slotPool, componentMainThreadExecutor);
        clock.advanceTime(batchSlotTimeout.toMillis() + 1L, TimeUnit.MILLISECONDS);
        this.runBatchSlotTimeoutCheck(slotPool, componentMainThreadExecutor);
    }

    private void runBatchSlotTimeoutCheck(DeclarativeSlotPoolBridge slotPool, ComponentMainThreadExecutor componentMainThreadExecutor) {
        CompletableFuture.runAsync(() -> ((DeclarativeSlotPoolBridge)slotPool).checkBatchSlotTimeout(), (Executor)componentMainThreadExecutor).join();
    }

    private DeclarativeSlotPoolBridge createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Duration batchSlotTimeout) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(batchSlotTimeout).setMainThreadExecutor(componentMainThreadExecutor).buildAndStart();
    }

    private DeclarativeSlotPoolBridge createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Duration batchSlotTimeout, Clock clock) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(batchSlotTimeout).setClock(clock).setMainThreadExecutor(componentMainThreadExecutor).buildAndStart();
    }
}

