package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.class */
class SlotPoolBatchSlotRequestTest {
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
    public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0];
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    SlotPoolBatchSlotRequestTest() {
    }

    @BeforeAll
    private static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterAll
    private static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    void testPendingBatchSlotRequestTimeout() throws Exception {
        DeclarativeSlotPoolBridge createAndSetUpSlotPool = createAndSetUpSlotPool(mainThreadExecutor, null, Time.milliseconds(2L));
        Throwable th = null;
        try {
            CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot = SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            requestNewAllocatedBatchSlot.getClass();
            Assertions.assertThatThrownBy(requestNewAllocatedBatchSlot::get).withFailMessage("Expected that slot future times out.", new Object[0]).isInstanceOf(ExecutionException.class).hasRootCauseInstanceOf(TimeoutException.class);
            if (createAndSetUpSlotPool != null) {
                if (0 == 0) {
                    createAndSetUpSlotPool.close();
                    return;
                }
                try {
                    createAndSetUpSlotPool.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndSetUpSlotPool != null) {
                if (0 != 0) {
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndSetUpSlotPool.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() throws Exception {
        Time milliseconds = Time.milliseconds(2L);
        ManualClock manualClock = new ManualClock();
        DeclarativeSlotPoolBridge createAndSetUpSlotPool = createAndSetUpSlotPool(mainThreadExecutor, null, milliseconds, manualClock);
        Throwable th = null;
        try {
            try {
                SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, resourceProfile);
                SlotPoolUtils.offerSlots(createAndSetUpSlotPool, mainThreadExecutor, Arrays.asList(resourceProfile));
                List asList = Arrays.asList(SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, ResourceProfile.UNKNOWN), SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, resourceProfile));
                advanceTimeAndTriggerCheckBatchSlotTimeout(createAndSetUpSlotPool, mainThreadExecutor, manualClock, milliseconds);
                Iterator it = asList.iterator();
                while (it.hasNext()) {
                    FlinkAssertions.assertThatFuture((CompletableFuture) it.next()).isNotDone();
                }
                if (createAndSetUpSlotPool != null) {
                    if (0 == 0) {
                        createAndSetUpSlotPool.close();
                        return;
                    }
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndSetUpSlotPool != null) {
                if (th != null) {
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndSetUpSlotPool.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testPendingBatchSlotRequestDoesNotFailIfResourceDeclaringFails() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setDeclareRequiredResourcesFunction((jobMasterId, resourceRequirements) -> {
            return FutureUtils.completedExceptionally(new FlinkException("Failed request"));
        });
        DeclarativeSlotPoolBridge createAndSetUpSlotPool = createAndSetUpSlotPool(mainThreadExecutor, testingResourceManagerGateway, Time.milliseconds(1000L));
        Throwable th = null;
        try {
            FlinkAssertions.assertThatFuture(SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, resourceProfile)).willNotCompleteWithin(Duration.ofMillis(50L));
            if (createAndSetUpSlotPool != null) {
                if (0 == 0) {
                    createAndSetUpSlotPool.close();
                    return;
                }
                try {
                    createAndSetUpSlotPool.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndSetUpSlotPool != null) {
                if (0 != 0) {
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndSetUpSlotPool.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exception {
        ManualClock manualClock = new ManualClock();
        Time milliseconds = Time.milliseconds(10000L);
        DeclarativeSlotPoolBridge createAndSetUpSlotPool = createAndSetUpSlotPool(mainThreadExecutor, null, milliseconds, manualClock);
        Throwable th = null;
        try {
            try {
                SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, resourceProfile);
                ResourceID offerSlots = SlotPoolUtils.offerSlots(createAndSetUpSlotPool, mainThreadExecutor, Arrays.asList(resourceProfile));
                List<CompletableFuture> asList = Arrays.asList(SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, ResourceProfile.UNKNOWN), SlotPoolUtils.requestNewAllocatedBatchSlot(createAndSetUpSlotPool, mainThreadExecutor, resourceProfile));
                advanceTimeAndTriggerCheckBatchSlotTimeout(createAndSetUpSlotPool, mainThreadExecutor, manualClock, milliseconds);
                FlinkAssertions.assertThatFuture(CompletableFuture.anyOf((CompletableFuture[]) asList.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY))).isNotDone();
                SlotPoolUtils.releaseTaskManager(createAndSetUpSlotPool, mainThreadExecutor, offerSlots);
                advanceTimeAndTriggerCheckBatchSlotTimeout(createAndSetUpSlotPool, mainThreadExecutor, manualClock, milliseconds);
                for (CompletableFuture completableFuture : asList) {
                    FlinkAssertions.assertThatFuture(completableFuture).isCompletedExceptionally();
                    completableFuture.getClass();
                    Assertions.assertThatThrownBy(completableFuture::get).withFailMessage("Expected that the slot future times out.", new Object[0]).isInstanceOf(ExecutionException.class).hasRootCauseInstanceOf(TimeoutException.class);
                }
                if (createAndSetUpSlotPool != null) {
                    if (0 == 0) {
                        createAndSetUpSlotPool.close();
                        return;
                    }
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndSetUpSlotPool != null) {
                if (th != null) {
                    try {
                        createAndSetUpSlotPool.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndSetUpSlotPool.close();
                }
            }
            throw th4;
        }
    }

    private void advanceTimeAndTriggerCheckBatchSlotTimeout(DeclarativeSlotPoolBridge declarativeSlotPoolBridge, ComponentMainThreadExecutor componentMainThreadExecutor, ManualClock manualClock, Time time) {
        runBatchSlotTimeoutCheck(declarativeSlotPoolBridge, componentMainThreadExecutor);
        manualClock.advanceTime(time.toMilliseconds() + 1, TimeUnit.MILLISECONDS);
        runBatchSlotTimeoutCheck(declarativeSlotPoolBridge, componentMainThreadExecutor);
    }

    private void runBatchSlotTimeoutCheck(DeclarativeSlotPoolBridge declarativeSlotPoolBridge, ComponentMainThreadExecutor componentMainThreadExecutor) {
        declarativeSlotPoolBridge.getClass();
        CompletableFuture.runAsync(declarativeSlotPoolBridge::checkBatchSlotTimeout, componentMainThreadExecutor).join();
    }

    private DeclarativeSlotPoolBridge createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Time time) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(time).buildAndStart(componentMainThreadExecutor);
    }

    private DeclarativeSlotPoolBridge createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Time time, Clock clock) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(time).setClock(clock).buildAndStart(componentMainThreadExecutor);
    }
}
