/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AbstractDeclarativeSlotPoolBridgeTest;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.util.function.TriFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class DeclarativeSlotPoolBridgeResourceDeclarationTest
extends AbstractDeclarativeSlotPoolBridgeTest {
    private RequirementListener requirementListener;
    private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;

    DeclarativeSlotPoolBridgeResourceDeclarationTest() {
    }

    @BeforeEach
    void setup() {
        this.requirementListener = new RequirementListener(this.componentMainThreadExecutor, this.slotRequestMaxInterval);
        this.constructDeclarativeSlotPoolBridge(this.componentMainThreadExecutor);
    }

    private void constructDeclarativeSlotPoolBridge(ComponentMainThreadExecutor mainThreadExecutor) {
        TestingDeclarativeSlotPoolBuilder slotPoolBuilder = TestingDeclarativeSlotPool.builder().setIncreaseResourceRequirementsByConsumer(x$0 -> this.requirementListener.increaseRequirements((ResourceCounter)x$0)).setDecreaseResourceRequirementsByConsumer(x$0 -> this.requirementListener.decreaseRequirements((ResourceCounter)x$0)).setReserveFreeSlotFunction((allocationId, resourceProfile) -> DeclarativeSlotPoolBridgeResourceDeclarationTest.createAllocatedSlot(allocationId)).setFreeReservedSlotFunction((TriFunction<AllocationID, Throwable, Long, ResourceCounter>)((TriFunction)(allocationID, throwable, aLong) -> ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1))).setReleaseSlotFunction((allocationID, e) -> ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1));
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(slotPoolBuilder);
        this.declarativeSlotPoolBridge = this.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory, mainThreadExecutor);
    }

    @AfterEach
    void teardown() {
        if (this.declarativeSlotPoolBridge != null) {
            this.declarativeSlotPoolBridge.close();
        }
    }

    @TestTemplate
    void testRequirementsIncreasedOnNewAllocation() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Duration.ofMinutes(5L));
        this.requirementListener.tryWaitSlotRequestIsDone();
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        try {
            ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.requirementListener = new RequirementListener(mainThreadExecutor, this.slotRequestMaxInterval);
            this.constructDeclarativeSlotPoolBridge(mainThreadExecutor);
            this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
            CompletableFuture allocationFuture = CompletableFuture.supplyAsync(() -> this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Duration.ofMillis(this.slotRequestMaxInterval.toMillis() * 2L)), (Executor)mainThreadExecutor).get();
            this.requirementListener.tryWaitSlotRequestIsDone();
            FlinkAssertions.assertThatFuture((CompletableFuture)allocationFuture).failsWithin(Duration.ofMinutes(1L));
            this.requirementListener.tryWaitSlotRequestIsDone();
            CompletableFuture.runAsync(() -> Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero(), (Executor)mainThreadExecutor).join();
        }
        finally {
            scheduledExecutorService.shutdown();
        }
    }

    @TestTemplate
    void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeResourceDeclarationTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsIncreasedOnSlotReservation() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeResourceDeclarationTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.requirementListener.tryWaitSlotRequestIsDone();
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotFreeing() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeResourceDeclarationTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.requirementListener.tryWaitSlotRequestIsDone();
        this.declarativeSlotPoolBridge.releaseSlot(slotRequestId, (Throwable)new RuntimeException("Test exception"));
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
        this.declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeResourceDeclarationTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        this.declarativeSlotPoolBridge.allocateAvailableSlot(new SlotRequestId(), newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.requirementListener.tryWaitSlotRequestIsDone();
        this.declarativeSlotPoolBridge.failAllocation(newSlot.getTaskManagerLocation().getResourceID(), newSlot.getAllocationId(), (Exception)new RuntimeException("Test exception"));
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    private static final class RequirementListener {
        ComponentMainThreadExecutor componentMainThreadExecutor;
        Duration slotRequestMaxInterval;
        ScheduledFuture<?> slotRequestFuture;
        private ResourceCounter requirements = ResourceCounter.empty();

        RequirementListener(ComponentMainThreadExecutor componentMainThreadExecutor, @Nonnull Duration slotRequestMaxInterval) {
            this.componentMainThreadExecutor = componentMainThreadExecutor;
            this.slotRequestMaxInterval = slotRequestMaxInterval;
        }

        private void increaseRequirements(ResourceCounter requirements) {
            if (this.slotRequestMaxInterval.toMillis() <= 0L) {
                this.requirements = this.requirements.add(requirements);
                return;
            }
            if (!this.slotSlotRequestFutureAssignable()) {
                this.slotRequestFuture.cancel(true);
            }
            this.slotRequestFuture = this.componentMainThreadExecutor.schedule(() -> this.checkSlotRequestMaxInterval(requirements), this.slotRequestMaxInterval.toMillis(), TimeUnit.MILLISECONDS);
        }

        private void decreaseRequirements(ResourceCounter requirements) {
            this.requirements = this.requirements.subtract(requirements);
        }

        public ResourceCounter getRequirements() {
            return this.requirements;
        }

        public void tryWaitSlotRequestIsDone() {
            if (Objects.nonNull(this.slotRequestFuture)) {
                try {
                    this.slotRequestFuture.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        private boolean slotSlotRequestFutureAssignable() {
            return this.slotRequestFuture == null || this.slotRequestFuture.isDone() || this.slotRequestFuture.isCancelled();
        }

        private void checkSlotRequestMaxInterval(ResourceCounter requirements) {
            if (this.slotRequestMaxInterval.toMillis() <= 0L) {
                return;
            }
            this.requirements = this.requirements.add(requirements);
        }
    }
}

