package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImplTest.class */
class PhysicalSlotRequestBulkCheckerImplTest {
    private static final Duration TIMEOUT = Duration.ofMillis(50);
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;
    private final ManualClock clock = new ManualClock();
    private PhysicalSlotRequestBulkCheckerImpl bulkChecker;
    private Set<PhysicalSlot> slots;
    private Supplier<Set<SlotInfo>> slotsRetriever;

    PhysicalSlotRequestBulkCheckerImplTest() {
    }

    @BeforeAll
    private static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterAll
    private static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @BeforeEach
    private void setup() {
        this.slots = new HashSet();
        this.slotsRetriever = () -> {
            return new HashSet(this.slots);
        };
        this.bulkChecker = new PhysicalSlotRequestBulkCheckerImpl(this.slotsRetriever, this.clock);
        this.bulkChecker.start(mainThreadExecutor);
    }

    @Test
    void testPendingBulkIsNotCancelled() throws InterruptedException, ExecutionException {
        CompletableFuture<ExecutionVertexID> completableFuture = new CompletableFuture<>();
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createPhysicalSlotRequestBulkWithCancellationFuture(completableFuture, new ExecutionVertexID(new JobVertexID(), 0)), TIMEOUT);
        checkNotCancelledAfter(completableFuture, 2 * TIMEOUT.toMillis());
    }

    @Test
    void testFulfilledBulkIsNotCancelled() throws InterruptedException, ExecutionException {
        CompletableFuture<ExecutionVertexID> completableFuture = new CompletableFuture<>();
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createPhysicalSlotRequestBulkWithCancellationFuture(completableFuture, new ExecutionVertexID(new JobVertexID(), 0)), TIMEOUT);
        checkNotCancelledAfter(completableFuture, 2 * TIMEOUT.toMillis());
    }

    private static void checkNotCancelledAfter(CompletableFuture<?> completableFuture, long j) throws ExecutionException, InterruptedException {
        mainThreadExecutor.schedule(() -> {
        }, j, TimeUnit.MILLISECONDS).get();
        Assertions.assertThatThrownBy(() -> {
            FlinkAssertions.assertThatFuture(completableFuture).isNotDone();
            completableFuture.get(j, TimeUnit.MILLISECONDS);
        }).withFailMessage("The future must not have been cancelled", new Object[0]).isInstanceOf(TimeoutException.class);
        FlinkAssertions.assertThatFuture(completableFuture).isNotDone();
    }

    @Test
    void testUnfulfillableBulkIsCancelled() {
        CompletableFuture<ExecutionVertexID> completableFuture = new CompletableFuture<>();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createPhysicalSlotRequestBulkWithCancellationFuture(completableFuture, executionVertexID), TIMEOUT);
        this.clock.advanceTime(TIMEOUT.toMillis() + 1, TimeUnit.MILLISECONDS);
        Assertions.assertThat(completableFuture.join()).isEqualTo(executionVertexID);
    }

    @Test
    void testBulkFulfilledOnCheck() {
        ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(new SlotSharingGroup());
        SharingPhysicalSlotRequestBulk createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(executionSlotSharingGroup);
        createPhysicalSlotRequestBulk.markFulfilled(executionSlotSharingGroup, new AllocationID());
        Assertions.assertThat(checkBulkTimeout(new PhysicalSlotRequestBulkWithTimestamp(createPhysicalSlotRequestBulk))).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.FULFILLED);
    }

    @Test
    void testBulkTimeoutOnCheck() {
        PhysicalSlotRequestBulkWithTimestamp createPhysicalSlotRequestBulkWithTimestamp = createPhysicalSlotRequestBulkWithTimestamp(new ExecutionSlotSharingGroup(new SlotSharingGroup()));
        this.clock.advanceTime(TIMEOUT.toMillis() + 1, TimeUnit.MILLISECONDS);
        Assertions.assertThat(checkBulkTimeout(createPhysicalSlotRequestBulkWithTimestamp)).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.TIMEOUT);
    }

    @Test
    void testBulkPendingOnCheckIfFulfillable() {
        PhysicalSlotRequestBulkWithTimestamp createPhysicalSlotRequestBulkWithTimestamp = createPhysicalSlotRequestBulkWithTimestamp(new ExecutionSlotSharingGroup(new SlotSharingGroup()));
        PhysicalSlotTestUtils.occupyPhysicalSlot(addOneSlot(), false);
        Assertions.assertThat(checkBulkTimeout(createPhysicalSlotRequestBulkWithTimestamp)).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING);
    }

    @Test
    void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
        Assertions.assertThat(checkBulkTimeout(createPhysicalSlotRequestBulkWithTimestamp(new ExecutionSlotSharingGroup(new SlotSharingGroup())))).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING);
    }

    @Test
    void testBulkFulfillable() {
        SharingPhysicalSlotRequestBulk createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new ExecutionSlotSharingGroup(new SlotSharingGroup()));
        addOneSlot();
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isTrue();
    }

    @Test
    void testBulkUnfulfillableWithInsufficientSlots() {
        SharingPhysicalSlotRequestBulk createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new ExecutionSlotSharingGroup(new SlotSharingGroup()), new ExecutionSlotSharingGroup(new SlotSharingGroup()));
        addOneSlot();
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isFalse();
    }

    @Test
    void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
        ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(new SlotSharingGroup());
        SharingPhysicalSlotRequestBulk createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(executionSlotSharingGroup, new ExecutionSlotSharingGroup(new SlotSharingGroup()));
        createPhysicalSlotRequestBulk.markFulfilled(executionSlotSharingGroup, addOneSlot().getAllocationId());
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isFalse();
    }

    @Test
    void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
        SharingPhysicalSlotRequestBulk createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new ExecutionSlotSharingGroup(new SlotSharingGroup()), new ExecutionSlotSharingGroup(new SlotSharingGroup()));
        PhysicalSlot addOneSlot = addOneSlot();
        addOneSlot();
        PhysicalSlotTestUtils.occupyPhysicalSlot(addOneSlot, true);
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isFalse();
    }

    @Test
    void testBulkFulfillableWithSlotOccupiedTemporarily() {
        SharingPhysicalSlotRequestBulk createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new ExecutionSlotSharingGroup(new SlotSharingGroup()), new ExecutionSlotSharingGroup(new SlotSharingGroup()));
        PhysicalSlot addOneSlot = addOneSlot();
        addOneSlot();
        PhysicalSlotTestUtils.occupyPhysicalSlot(addOneSlot, false);
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isTrue();
    }

    private PhysicalSlotRequestBulkWithTimestamp createPhysicalSlotRequestBulkWithTimestamp(ExecutionSlotSharingGroup... executionSlotSharingGroupArr) {
        PhysicalSlotRequestBulkWithTimestamp physicalSlotRequestBulkWithTimestamp = new PhysicalSlotRequestBulkWithTimestamp(createPhysicalSlotRequestBulk(executionSlotSharingGroupArr));
        physicalSlotRequestBulkWithTimestamp.markUnfulfillable(this.clock.relativeTimeMillis());
        return physicalSlotRequestBulkWithTimestamp;
    }

    private static SharingPhysicalSlotRequestBulk createPhysicalSlotRequestBulk(ExecutionSlotSharingGroup... executionSlotSharingGroupArr) {
        TestingPhysicalSlotRequestBulkBuilder newBuilder = TestingPhysicalSlotRequestBulkBuilder.newBuilder();
        for (ExecutionSlotSharingGroup executionSlotSharingGroup : executionSlotSharingGroupArr) {
            newBuilder.addPendingRequest(executionSlotSharingGroup, ResourceProfile.UNKNOWN);
        }
        return newBuilder.buildSharingPhysicalSlotRequestBulk();
    }

    private PhysicalSlotRequestBulk createPhysicalSlotRequestBulkWithCancellationFuture(CompletableFuture<ExecutionVertexID> completableFuture, ExecutionVertexID executionVertexID) {
        ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(new SlotSharingGroup());
        executionSlotSharingGroup.addVertex(executionVertexID);
        return TestingPhysicalSlotRequestBulkBuilder.newBuilder().addPendingRequest(executionSlotSharingGroup, ResourceProfile.UNKNOWN).setCanceller((executionVertexID2, th) -> {
            completableFuture.complete(executionVertexID2);
        }).buildSharingPhysicalSlotRequestBulk();
    }

    private PhysicalSlot addOneSlot() {
        PhysicalSlot createPhysicalSlot = PhysicalSlotTestUtils.createPhysicalSlot();
        CompletableFuture.runAsync(() -> {
            this.slots.add(createPhysicalSlot);
        }, mainThreadExecutor).join();
        return createPhysicalSlot;
    }

    private PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult checkBulkTimeout(PhysicalSlotRequestBulkWithTimestamp physicalSlotRequestBulkWithTimestamp) {
        return this.bulkChecker.checkPhysicalSlotRequestBulkTimeout(physicalSlotRequestBulkWithTimestamp, TIMEOUT);
    }

    private boolean isFulfillable(PhysicalSlotRequestBulk physicalSlotRequestBulk) {
        return PhysicalSlotRequestBulkCheckerImpl.isSlotRequestBulkFulfillable(physicalSlotRequestBulk, this.slotsRetriever);
    }
}
