package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.class */
class DeclarativeSlotPoolBridgeResourceDeclarationTest extends AbstractDeclarativeSlotPoolBridgeTest {
    private RequirementListener requirementListener;
    private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest$RequirementListener.class */
    public static final class RequirementListener {
        ComponentMainThreadExecutor componentMainThreadExecutor;
        Duration slotRequestMaxInterval;
        ScheduledFuture<?> slotRequestFuture;
        private ResourceCounter requirements = ResourceCounter.empty();

        RequirementListener(ComponentMainThreadExecutor componentMainThreadExecutor, @Nonnull Duration duration) {
            this.componentMainThreadExecutor = componentMainThreadExecutor;
            this.slotRequestMaxInterval = duration;
        }

        private void increaseRequirements(ResourceCounter resourceCounter) {
            if (this.slotRequestMaxInterval.toMillis() <= 0) {
                this.requirements = this.requirements.add(resourceCounter);
                return;
            }
            if (!slotSlotRequestFutureAssignable()) {
                this.slotRequestFuture.cancel(true);
            }
            this.slotRequestFuture = this.componentMainThreadExecutor.schedule(() -> {
                checkSlotRequestMaxInterval(resourceCounter);
            }, this.slotRequestMaxInterval.toMillis(), TimeUnit.MILLISECONDS);
        }

        private void decreaseRequirements(ResourceCounter resourceCounter) {
            this.requirements = this.requirements.subtract(resourceCounter);
        }

        public ResourceCounter getRequirements() {
            return this.requirements;
        }

        public void tryWaitSlotRequestIsDone() {
            if (Objects.nonNull(this.slotRequestFuture)) {
                try {
                    this.slotRequestFuture.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        private boolean slotSlotRequestFutureAssignable() {
            return this.slotRequestFuture == null || this.slotRequestFuture.isDone() || this.slotRequestFuture.isCancelled();
        }

        private void checkSlotRequestMaxInterval(ResourceCounter resourceCounter) {
            if (this.slotRequestMaxInterval.toMillis() <= 0) {
                return;
            }
            this.requirements = this.requirements.add(resourceCounter);
        }
    }

    DeclarativeSlotPoolBridgeResourceDeclarationTest() {
    }

    @BeforeEach
    void setup() {
        this.requirementListener = new RequirementListener(this.componentMainThreadExecutor, this.slotRequestMaxInterval);
        constructDeclarativeSlotPoolBridge(this.componentMainThreadExecutor);
    }

    private void constructDeclarativeSlotPoolBridge(ComponentMainThreadExecutor componentMainThreadExecutor) {
        TestingDeclarativeSlotPoolBuilder builder = TestingDeclarativeSlotPool.builder();
        RequirementListener requirementListener = this.requirementListener;
        Objects.requireNonNull(requirementListener);
        TestingDeclarativeSlotPoolBuilder increaseResourceRequirementsByConsumer = builder.setIncreaseResourceRequirementsByConsumer(requirementListener::increaseRequirements);
        RequirementListener requirementListener2 = this.requirementListener;
        Objects.requireNonNull(requirementListener2);
        this.declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(increaseResourceRequirementsByConsumer.setDecreaseResourceRequirementsByConsumer(requirementListener2::decreaseRequirements).setReserveFreeSlotFunction((allocationID, resourceProfile) -> {
            return createAllocatedSlot(allocationID);
        }).setFreeReservedSlotFunction((allocationID2, th, l) -> {
            return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
        }).setReleaseSlotFunction((allocationID3, exc) -> {
            return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
        })), componentMainThreadExecutor);
    }

    @AfterEach
    void teardown() {
        if (this.declarativeSlotPoolBridge != null) {
            this.declarativeSlotPoolBridge.close();
        }
    }

    @TestTemplate
    void testRequirementsIncreasedOnNewAllocation() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Duration.ofMinutes(5L));
        this.requirementListener.tryWaitSlotRequestIsDone();
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    @TestTemplate
    void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            ComponentMainThreadExecutor forSingleThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(newSingleThreadScheduledExecutor);
            this.requirementListener = new RequirementListener(forSingleThreadExecutor, this.slotRequestMaxInterval);
            constructDeclarativeSlotPoolBridge(forSingleThreadExecutor);
            this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture completableFuture = (CompletableFuture) CompletableFuture.supplyAsync(() -> {
                return this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Duration.ofMillis(this.slotRequestMaxInterval.toMillis() * 2));
            }, forSingleThreadExecutor).get();
            this.requirementListener.tryWaitSlotRequestIsDone();
            FlinkAssertions.assertThatFuture(completableFuture).failsWithin(Duration.ofMinutes(1L));
            this.requirementListener.tryWaitSlotRequestIsDone();
            CompletableFuture.runAsync(() -> {
                Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
            }, forSingleThreadExecutor).join();
            newSingleThreadScheduledExecutor.shutdown();
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @TestTemplate
    void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot(new AllocationID())));
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsIncreasedOnSlotReservation() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        PhysicalSlot createAllocatedSlot = createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
        this.declarativeSlotPoolBridge.allocateAvailableSlot(new SlotRequestId(), createAllocatedSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.requirementListener.tryWaitSlotRequestIsDone();
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotFreeing() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        PhysicalSlot createAllocatedSlot = createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, createAllocatedSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.requirementListener.tryWaitSlotRequestIsDone();
        this.declarativeSlotPoolBridge.releaseSlot(slotRequestId, new RuntimeException("Test exception"));
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        PhysicalSlot createAllocatedSlot = createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
        this.declarativeSlotPoolBridge.allocateAvailableSlot(new SlotRequestId(), createAllocatedSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.requirementListener.tryWaitSlotRequestIsDone();
        this.declarativeSlotPoolBridge.failAllocation(createAllocatedSlot.getTaskManagerLocation().getResourceID(), createAllocatedSlot.getAllocationId(), new RuntimeException("Test exception"));
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }
}
