package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.class */
class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTest {
    DeclarativeSlotPoolBridgeTest() {
    }

    @TestTemplate
    void testSlotOffer() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        PhysicalSlot createAllocatedSlot = createAllocatedSlot(new AllocationID());
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setGetFreeSlotTrackerSupplier(() -> {
            return TestingFreeSlotTracker.newBuilder().setGetFreeSlotsInformationSupplier(() -> {
                return Collections.singleton(createAllocatedSlot);
            }).setGetAvailableSlotsSupplier(() -> {
                return Collections.singleton(createAllocatedSlot.getAllocationId());
            }).build();
        })));
        try {
            createDeclarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture requestNewAllocatedSlot = createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, (Duration) null);
            tryWaitSlotRequestIsDone(createDeclarativeSlotPoolBridge);
            createDeclarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
            requestNewAllocatedSlot.join();
            if (createDeclarativeSlotPoolBridge != null) {
                createDeclarativeSlotPoolBridge.close();
            }
        } catch (Throwable th) {
            if (createDeclarativeSlotPoolBridge != null) {
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testNotEnoughResourcesAvailableFailsPendingRequests() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()));
        try {
            createDeclarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture completableFuture = (CompletableFuture) CompletableFuture.supplyAsync(() -> {
                return createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, Duration.ofMinutes(5L));
            }, this.componentMainThreadExecutor).get();
            tryWaitSlotRequestIsDone(createDeclarativeSlotPoolBridge);
            this.componentMainThreadExecutor.execute(() -> {
                createDeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(Collections.emptyList());
            });
            FlinkAssertions.assertThatFuture(completableFuture).failsWithin(Duration.ofSeconds(10L));
            if (createDeclarativeSlotPoolBridge != null) {
                createDeclarativeSlotPoolBridge.close();
            }
        } catch (Throwable th) {
            if (createDeclarativeSlotPoolBridge != null) {
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testReleasingAllocatedSlot() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        AllocationID allocationID = new AllocationID();
        PhysicalSlot createAllocatedSlot = createAllocatedSlot(allocationID);
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setReserveFreeSlotFunction((allocationID2, resourceProfile) -> {
            Assertions.assertThat(allocationID2).isSameAs(allocationID);
            return createAllocatedSlot;
        }).setFreeReservedSlotFunction((allocationID3, th, l) -> {
            completableFuture.complete(allocationID3);
            return ResourceCounter.empty();
        })));
        try {
            createDeclarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            SlotRequestId slotRequestId = new SlotRequestId();
            createDeclarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, allocationID, createAllocatedSlot.getResourceProfile());
            tryWaitSlotRequestIsDone(createDeclarativeSlotPoolBridge);
            createDeclarativeSlotPoolBridge.releaseSlot(slotRequestId, (Throwable) null);
            Assertions.assertThat((AllocationID) completableFuture.join()).isSameAs(allocationID);
            if (createDeclarativeSlotPoolBridge != null) {
                createDeclarativeSlotPoolBridge.close();
            }
        } catch (Throwable th2) {
            if (createDeclarativeSlotPoolBridge != null) {
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @TestTemplate
    void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory());
        try {
            createDeclarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            List list = (List) Arrays.asList(new SlotRequestId(), new SlotRequestId()).stream().map(slotRequestId -> {
                CompletableFuture requestNewAllocatedSlot = createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, RPC_TIMEOUT);
                requestNewAllocatedSlot.whenComplete((physicalSlot, th) -> {
                    if (th != null) {
                        createDeclarativeSlotPoolBridge.releaseSlot(slotRequestId, th);
                    }
                });
                return requestNewAllocatedSlot;
            }).collect(Collectors.toList());
            tryWaitSlotRequestIsDone(createDeclarativeSlotPoolBridge);
            createDeclarativeSlotPoolBridge.close();
            Assertions.assertThatThrownBy(() -> {
                FutureUtils.waitForAll(list).get();
            }).as("The slot futures should be completed exceptionally.", new Object[0]).isInstanceOf(ExecutionException.class);
            if (createDeclarativeSlotPoolBridge != null) {
                createDeclarativeSlotPoolBridge.close();
            }
        } catch (Throwable th) {
            if (createDeclarativeSlotPoolBridge != null) {
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory());
        try {
            createDeclarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture requestNewAllocatedSlot = createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, RPC_TIMEOUT);
            tryWaitSlotRequestIsDone(createDeclarativeSlotPoolBridge);
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            createDeclarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
            AllocationID allocationID = new AllocationID();
            createDeclarativeSlotPoolBridge.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), Collections.singleton(new SlotOffer(allocationID, 0, ResourceProfile.ANY)));
            Assertions.assertThat(((PhysicalSlot) requestNewAllocatedSlot.join()).getAllocationId()).isSameAs(allocationID);
            if (createDeclarativeSlotPoolBridge != null) {
                createDeclarativeSlotPoolBridge.close();
            }
        } catch (Throwable th) {
            if (createDeclarativeSlotPoolBridge != null) {
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testIfJobIsRestartingAllOfferedSlotsWillBeRegistered() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setRegisterSlotsFunction((collection, taskManagerLocation, taskManagerGateway, l) -> {
            completableFuture.complete(null);
            return new ArrayList(collection);
        })));
        try {
            createDeclarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            createDeclarativeSlotPoolBridge.setIsJobRestarting(true);
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            createDeclarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
            createDeclarativeSlotPoolBridge.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY)));
            completableFuture.join();
            if (createDeclarativeSlotPoolBridge != null) {
                createDeclarativeSlotPoolBridge.close();
            }
        } catch (Throwable th) {
            if (createDeclarativeSlotPoolBridge != null) {
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testSlotsBatchAllocatableLogic() throws Exception {
        testSlotsBatchAllocatableLogic(1);
        testSlotsBatchAllocatableLogic(2);
        testSlotsBatchAllocatableLogic(4);
        testSlotsBatchAllocatableLogic(7);
        testSlotsBatchAllocatableLogic(10);
        testSlotsBatchAllocatableLogic(32);
    }

    private void testSlotsBatchAllocatableLogic(int i) throws Exception {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(hashSet2, hashSet);
        try {
            createDeclarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, (Duration) null));
            }
            tryWaitSlotRequestIsDone(createDeclarativeSlotPoolBridge);
            for (int i3 = 0; i3 < i; i3++) {
                newSlotsAreAvailable(createDeclarativeSlotPoolBridge, hashSet2, hashSet, createAllocatedSlot(new AllocationID()));
                if (this.slotBatchAllocatable) {
                    checkForSlotBatchAllocating(i, i3, arrayList);
                } else {
                    arrayList.get(i3).join();
                }
            }
            if (createDeclarativeSlotPoolBridge != null) {
                createDeclarativeSlotPoolBridge.close();
            }
        } catch (Throwable th) {
            if (createDeclarativeSlotPoolBridge != null) {
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void checkForSlotBatchAllocating(int i, int i2, List<CompletableFuture<PhysicalSlot>> list) {
        if (i2 < i - 1) {
            Assertions.assertThat(FutureUtils.waitForAll(list).getNumFuturesCompleted()).isZero();
        } else {
            FutureUtils.waitForAll(list).join();
        }
    }

    private void newSlotsAreAvailable(DeclarativeSlotPoolBridge declarativeSlotPoolBridge, Set<PhysicalSlot> set, Set<AllocationID> set2, PhysicalSlot physicalSlot) {
        set.add(physicalSlot);
        set2.add(physicalSlot.getAllocationId());
        declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(physicalSlot));
    }

    private void tryWaitSlotRequestIsDone(DeclarativeSlotPoolBridge declarativeSlotPoolBridge) {
        if (declarativeSlotPoolBridge.getDeclarativeSlotPool() instanceof DefaultDeclarativeSlotPool) {
            declarativeSlotPoolBridge.getDeclarativeSlotPool().tryWaitSlotRequestIsDone();
        }
    }

    private DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(Set<PhysicalSlot> set, Set<AllocationID> set2) {
        return createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setGetFreeSlotTrackerSupplier(() -> {
            return TestingFreeSlotTracker.newBuilder().setGetFreeSlotsInformationSupplier(() -> {
                return set;
            }).setGetAvailableSlotsSupplier(() -> {
                return set2;
            }).build();
        })));
    }
}
