/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.SharedSlot;
import org.apache.flink.runtime.scheduler.SharedSlotTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SharedSlotTest {
    private static final ExecutionVertexID EV1 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV2 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionSlotSharingGroup SG = SharedSlotTestingUtils.createExecutionSlotSharingGroup(EV1, EV2);
    private static final SlotRequestId PHYSICAL_SLOT_REQUEST_ID = new SlotRequestId();
    private static final ResourceProfile RP = ResourceProfile.newBuilder().setCpuCores(2.0).build();

    SharedSlotTest() {
    }

    @Test
    void testCreation() {
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().slotWillBeOccupiedIndefinitely().build();
        Assertions.assertThat((Comparable)sharedSlot.getPhysicalSlotRequestId()).isEqualTo((Object)PHYSICAL_SLOT_REQUEST_ID);
        Assertions.assertThat((Object)sharedSlot.getPhysicalSlotResourceProfile()).isEqualTo((Object)RP);
        Assertions.assertThat((boolean)sharedSlot.willOccupySlotIndefinitely()).isTrue();
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isTrue();
    }

    @Test
    void testAssignAsPayloadToPhysicalSlot() {
        CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<PhysicalSlot>();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).build();
        TestingPhysicalSlot physicalSlot = new TestingPhysicalSlot(RP, new AllocationID());
        slotContextFuture.complete(physicalSlot);
        Assertions.assertThat((Object)physicalSlot.getPayload()).isEqualTo((Object)sharedSlot);
    }

    @Test
    void testLogicalSlotAllocation() {
        CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<PhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).slotWillBeOccupiedIndefinitely().withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        FlinkAssertions.assertThatFuture((CompletableFuture)logicalSlotFuture).isNotDone();
        AllocationID allocationId = new AllocationID();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        slotContextFuture.complete(new TestingPhysicalSlot(allocationId, taskManagerLocation, 3, taskManagerGateway, RP));
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isFalse();
        FlinkAssertions.assertThatFuture(released).isNotDone();
        Assertions.assertThat((CompletableFuture)logicalSlotFuture).isDone();
        LogicalSlot logicalSlot = (LogicalSlot)logicalSlotFuture.join();
        Assertions.assertThat((Comparable)logicalSlot.getAllocationId()).isEqualTo((Object)allocationId);
        Assertions.assertThat((Comparable)logicalSlot.getTaskManagerLocation()).isEqualTo((Object)taskManagerLocation);
        Assertions.assertThat((Object)logicalSlot.getTaskManagerGateway()).isEqualTo((Object)taskManagerGateway);
        Assertions.assertThat((Comparable)logicalSlot.getLocality()).isEqualTo((Object)Locality.UNKNOWN);
    }

    @Test
    void testLogicalSlotFailureDueToPhysicalSlotFailure() {
        CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<PhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        Throwable cause = new Throwable();
        slotContextFuture.completeExceptionally(cause);
        Assertions.assertThat((boolean)logicalSlotFuture.isCompletedExceptionally()).isTrue();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(logicalSlotFuture::get).isInstanceOf(ExecutionException.class)).hasCause(cause);
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isTrue();
        Assertions.assertThat(released).isDone();
    }

    @Test
    void testCancelCompletedLogicalSlotRequest() {
        CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<PhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        slotContextFuture.complete(new TestingPhysicalSlot(RP, new AllocationID()));
        sharedSlot.cancelLogicalSlotRequest(EV1, new Throwable());
        FlinkAssertions.assertThatFuture((CompletableFuture)logicalSlotFuture).isNotCompletedExceptionally();
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isFalse();
        Assertions.assertThat(released).isNotDone();
    }

    @Test
    void testCancelPendingLogicalSlotRequest() {
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        Throwable cause = new Throwable();
        sharedSlot.cancelLogicalSlotRequest(EV1, cause);
        Assertions.assertThat((boolean)logicalSlotFuture.isCompletedExceptionally()).isTrue();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(logicalSlotFuture::get).isInstanceOf(ExecutionException.class)).hasCause(cause);
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isTrue();
        FlinkAssertions.assertThatFuture(released).isDone();
    }

    @Test
    void testReturnAllocatedLogicalSlot() {
        CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<PhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        slotContextFuture.complete(new TestingPhysicalSlot(RP, new AllocationID()));
        sharedSlot.returnLogicalSlot((LogicalSlot)logicalSlotFuture.join());
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isTrue();
        FlinkAssertions.assertThatFuture(released).isDone();
    }

    @Test
    void testReleaseIfPhysicalSlotRequestIsIncomplete() {
        CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<PhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        sharedSlot.allocateLogicalSlot(EV1);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sharedSlot.release(new Throwable())).withFailMessage("IllegalStateException is expected trying to release a shared slot with incomplete physical slot request", new Object[0])).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isFalse();
        FlinkAssertions.assertThatFuture(released).isNotDone();
    }

    @Test
    void testReleaseIfPhysicalSlotIsAllocated() {
        CompletableFuture<PhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        LogicalSlot logicalSlot = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        CompletableFuture terminalFuture = new CompletableFuture();
        logicalSlot.tryAssignPayload((LogicalSlot.Payload)new DummyPayload(terminalFuture));
        FlinkAssertions.assertThatFuture(terminalFuture).isNotDone();
        sharedSlot.release(new Throwable());
        FlinkAssertions.assertThatFuture(terminalFuture).isDone();
        Assertions.assertThat((boolean)sharedSlot.isEmpty()).isTrue();
        FlinkAssertions.assertThatFuture(released).isDone();
    }

    @Test
    void tesDuplicatedReturnLogicalSlotFails() {
        CompletableFuture<PhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        AtomicInteger released = new AtomicInteger(0);
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(g -> released.incrementAndGet()).build();
        LogicalSlot logicalSlot = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        sharedSlot.returnLogicalSlot(logicalSlot);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sharedSlot.returnLogicalSlot(logicalSlot)).withFailMessage("Duplicated 'returnLogicalSlot' call should fail with IllegalStateException", new Object[0])).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testReleaseEmptyDoesNotCallAllocatorReleaseBack() {
        CompletableFuture<PhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        CompletableFuture<SharedSlot> sharedSlotReleaseFuture = new CompletableFuture<SharedSlot>();
        AtomicInteger released = new AtomicInteger(0);
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(g -> {
            ((SharedSlot)sharedSlotReleaseFuture.join()).release(new Throwable());
            released.incrementAndGet();
        }).build();
        sharedSlotReleaseFuture.complete(sharedSlot);
        LogicalSlot logicalSlot = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        Assertions.assertThat((AtomicInteger)released).hasValue(0);
        sharedSlot.returnLogicalSlot(logicalSlot);
        Assertions.assertThat((AtomicInteger)released).hasValue(1);
        sharedSlot.release(new Throwable());
        Assertions.assertThat((AtomicInteger)released).hasValue(1);
    }

    @Test
    void testReturnLogicalSlotWhileReleasingDoesNotCauseConcurrentModificationException() {
        CompletableFuture<PhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        final SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).build();
        LogicalSlot logicalSlot1 = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        final LogicalSlot logicalSlot2 = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV2).join();
        logicalSlot1.tryAssignPayload(new LogicalSlot.Payload(){

            public void fail(Throwable cause) {
                sharedSlot.returnLogicalSlot(logicalSlot2);
            }

            public CompletableFuture<?> getTerminalStateFuture() {
                return CompletableFuture.completedFuture(null);
            }
        });
        sharedSlot.release(new Throwable());
    }

    private static class SharedSlotBuilder {
        private CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture();
        private boolean slotWillBeOccupiedIndefinitely = false;
        private Consumer<ExecutionSlotSharingGroup> externalReleaseCallback = group -> {};

        private SharedSlotBuilder() {
        }

        private SharedSlotBuilder withSlotContextFuture(CompletableFuture<PhysicalSlot> slotContextFuture) {
            this.slotContextFuture = slotContextFuture;
            return this;
        }

        private SharedSlotBuilder slotWillBeOccupiedIndefinitely() {
            this.slotWillBeOccupiedIndefinitely = true;
            return this;
        }

        private SharedSlotBuilder withExternalReleaseCallback(Consumer<ExecutionSlotSharingGroup> releaseCallback) {
            this.externalReleaseCallback = releaseCallback;
            return this;
        }

        private SharedSlot build() {
            return new SharedSlot(PHYSICAL_SLOT_REQUEST_ID, RP, SG, this.slotContextFuture, this.slotWillBeOccupiedIndefinitely, this.externalReleaseCallback);
        }

        private static SharedSlotBuilder newBuilder() {
            return new SharedSlotBuilder();
        }
    }
}

