package org.apache.flink.runtime.scheduler;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/SharedSlotTest.class */
public class SharedSlotTest {
    private static final ExecutionVertexID EV1 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV2 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionSlotSharingGroup SG = SharedSlotTestingUtils.createExecutionSlotSharingGroup(EV1, EV2);
    private static final SlotRequestId PHYSICAL_SLOT_REQUEST_ID = new SlotRequestId();
    private static final ResourceProfile RP = ResourceProfile.newBuilder().setCpuCores(2.0d).build();

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SharedSlotTest$SharedSlotBuilder.class */
    private static class SharedSlotBuilder {
        private CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<>();
        private boolean slotWillBeOccupiedIndefinitely = false;
        private Consumer<ExecutionSlotSharingGroup> externalReleaseCallback = executionSlotSharingGroup -> {
        };

        private SharedSlotBuilder() {
        }

        private SharedSlotBuilder withSlotContextFuture(CompletableFuture<PhysicalSlot> completableFuture) {
            this.slotContextFuture = completableFuture;
            return this;
        }

        private SharedSlotBuilder slotWillBeOccupiedIndefinitely() {
            this.slotWillBeOccupiedIndefinitely = true;
            return this;
        }

        private SharedSlotBuilder withExternalReleaseCallback(Consumer<ExecutionSlotSharingGroup> consumer) {
            this.externalReleaseCallback = consumer;
            return this;
        }

        private SharedSlot build() {
            return new SharedSlot(SharedSlotTest.PHYSICAL_SLOT_REQUEST_ID, SharedSlotTest.RP, SharedSlotTest.SG, this.slotContextFuture, this.slotWillBeOccupiedIndefinitely, this.externalReleaseCallback);
        }

        private static SharedSlotBuilder newBuilder() {
            return new SharedSlotBuilder();
        }
    }

    SharedSlotTest() {
    }

    @Test
    void testCreation() {
        SharedSlot build = SharedSlotBuilder.newBuilder().slotWillBeOccupiedIndefinitely().build();
        Assertions.assertThat(build.getPhysicalSlotRequestId()).isEqualTo(PHYSICAL_SLOT_REQUEST_ID);
        Assertions.assertThat(build.getPhysicalSlotResourceProfile()).isEqualTo(RP);
        Assertions.assertThat(build.willOccupySlotIndefinitely()).isTrue();
        Assertions.assertThat(build.isEmpty()).isTrue();
    }

    @Test
    void testAssignAsPayloadToPhysicalSlot() {
        CompletableFuture<PhysicalSlot> completableFuture = new CompletableFuture<>();
        SharedSlot build = SharedSlotBuilder.newBuilder().withSlotContextFuture(completableFuture).build();
        TestingPhysicalSlot testingPhysicalSlot = new TestingPhysicalSlot(RP, new AllocationID());
        completableFuture.complete(testingPhysicalSlot);
        Assertions.assertThat(testingPhysicalSlot.getPayload()).isEqualTo(build);
    }

    @Test
    void testLogicalSlotAllocation() {
        CompletableFuture<PhysicalSlot> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        SharedSlotBuilder slotWillBeOccupiedIndefinitely = SharedSlotBuilder.newBuilder().withSlotContextFuture(completableFuture).slotWillBeOccupiedIndefinitely();
        Objects.requireNonNull(completableFuture2);
        SharedSlot build = slotWillBeOccupiedIndefinitely.withExternalReleaseCallback((v1) -> {
            r1.complete(v1);
        }).build();
        CompletableFuture allocateLogicalSlot = build.allocateLogicalSlot(EV1);
        FlinkAssertions.assertThatFuture(allocateLogicalSlot).isNotDone();
        AllocationID allocationID = new AllocationID();
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        completableFuture.complete(new TestingPhysicalSlot(allocationID, localTaskManagerLocation, 3, simpleAckingTaskManagerGateway, RP));
        Assertions.assertThat(build.isEmpty()).isFalse();
        FlinkAssertions.assertThatFuture(completableFuture2).isNotDone();
        Assertions.assertThat(allocateLogicalSlot).isDone();
        LogicalSlot logicalSlot = (LogicalSlot) allocateLogicalSlot.join();
        Assertions.assertThat(logicalSlot.getAllocationId()).isEqualTo(allocationID);
        Assertions.assertThat(logicalSlot.getTaskManagerLocation()).isEqualTo(localTaskManagerLocation);
        Assertions.assertThat(logicalSlot.getTaskManagerGateway()).isEqualTo(simpleAckingTaskManagerGateway);
        Assertions.assertThat(logicalSlot.getLocality()).isEqualTo(Locality.UNKNOWN);
    }

    @Test
    void testLogicalSlotFailureDueToPhysicalSlotFailure() {
        CompletableFuture<PhysicalSlot> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        SharedSlotBuilder withSlotContextFuture = SharedSlotBuilder.newBuilder().withSlotContextFuture(completableFuture);
        Objects.requireNonNull(completableFuture2);
        SharedSlot build = withSlotContextFuture.withExternalReleaseCallback((v1) -> {
            r1.complete(v1);
        }).build();
        CompletableFuture allocateLogicalSlot = build.allocateLogicalSlot(EV1);
        Throwable th = new Throwable();
        completableFuture.completeExceptionally(th);
        Assertions.assertThat(allocateLogicalSlot.isCompletedExceptionally()).isTrue();
        Objects.requireNonNull(allocateLogicalSlot);
        Assertions.assertThatThrownBy(allocateLogicalSlot::get).isInstanceOf(ExecutionException.class).hasCause(th);
        Assertions.assertThat(build.isEmpty()).isTrue();
        Assertions.assertThat(completableFuture2).isDone();
    }

    @Test
    void testCancelCompletedLogicalSlotRequest() {
        CompletableFuture<PhysicalSlot> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        SharedSlotBuilder withSlotContextFuture = SharedSlotBuilder.newBuilder().withSlotContextFuture(completableFuture);
        Objects.requireNonNull(completableFuture2);
        SharedSlot build = withSlotContextFuture.withExternalReleaseCallback((v1) -> {
            r1.complete(v1);
        }).build();
        CompletableFuture allocateLogicalSlot = build.allocateLogicalSlot(EV1);
        completableFuture.complete(new TestingPhysicalSlot(RP, new AllocationID()));
        build.cancelLogicalSlotRequest(EV1, new Throwable());
        FlinkAssertions.assertThatFuture(allocateLogicalSlot).isNotCompletedExceptionally();
        Assertions.assertThat(build.isEmpty()).isFalse();
        Assertions.assertThat(completableFuture2).isNotDone();
    }

    @Test
    void testCancelPendingLogicalSlotRequest() {
        CompletableFuture completableFuture = new CompletableFuture();
        SharedSlotBuilder newBuilder = SharedSlotBuilder.newBuilder();
        Objects.requireNonNull(completableFuture);
        SharedSlot build = newBuilder.withExternalReleaseCallback((v1) -> {
            r1.complete(v1);
        }).build();
        CompletableFuture allocateLogicalSlot = build.allocateLogicalSlot(EV1);
        Throwable th = new Throwable();
        build.cancelLogicalSlotRequest(EV1, th);
        Assertions.assertThat(allocateLogicalSlot.isCompletedExceptionally()).isTrue();
        Objects.requireNonNull(allocateLogicalSlot);
        Assertions.assertThatThrownBy(allocateLogicalSlot::get).isInstanceOf(ExecutionException.class).hasCause(th);
        Assertions.assertThat(build.isEmpty()).isTrue();
        FlinkAssertions.assertThatFuture(completableFuture).isDone();
    }

    @Test
    void testReturnAllocatedLogicalSlot() {
        CompletableFuture<PhysicalSlot> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        SharedSlotBuilder withSlotContextFuture = SharedSlotBuilder.newBuilder().withSlotContextFuture(completableFuture);
        Objects.requireNonNull(completableFuture2);
        SharedSlot build = withSlotContextFuture.withExternalReleaseCallback((v1) -> {
            r1.complete(v1);
        }).build();
        CompletableFuture allocateLogicalSlot = build.allocateLogicalSlot(EV1);
        completableFuture.complete(new TestingPhysicalSlot(RP, new AllocationID()));
        build.returnLogicalSlot((LogicalSlot) allocateLogicalSlot.join());
        Assertions.assertThat(build.isEmpty()).isTrue();
        FlinkAssertions.assertThatFuture(completableFuture2).isDone();
    }

    @Test
    void testReleaseIfPhysicalSlotRequestIsIncomplete() {
        CompletableFuture<PhysicalSlot> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        SharedSlotBuilder withSlotContextFuture = SharedSlotBuilder.newBuilder().withSlotContextFuture(completableFuture);
        Objects.requireNonNull(completableFuture2);
        SharedSlot build = withSlotContextFuture.withExternalReleaseCallback((v1) -> {
            r1.complete(v1);
        }).build();
        build.allocateLogicalSlot(EV1);
        Assertions.assertThatThrownBy(() -> {
            build.release(new Throwable());
        }).withFailMessage("IllegalStateException is expected trying to release a shared slot with incomplete physical slot request", new Object[0]).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat(build.isEmpty()).isFalse();
        FlinkAssertions.assertThatFuture(completableFuture2).isNotDone();
    }

    @Test
    void testReleaseIfPhysicalSlotIsAllocated() {
        CompletableFuture<PhysicalSlot> completedFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        CompletableFuture completableFuture = new CompletableFuture();
        SharedSlotBuilder withSlotContextFuture = SharedSlotBuilder.newBuilder().withSlotContextFuture(completedFuture);
        Objects.requireNonNull(completableFuture);
        SharedSlot build = withSlotContextFuture.withExternalReleaseCallback((v1) -> {
            r1.complete(v1);
        }).build();
        LogicalSlot logicalSlot = (LogicalSlot) build.allocateLogicalSlot(EV1).join();
        CompletableFuture completableFuture2 = new CompletableFuture();
        logicalSlot.tryAssignPayload(new DummyPayload(completableFuture2));
        FlinkAssertions.assertThatFuture(completableFuture2).isNotDone();
        build.release(new Throwable());
        FlinkAssertions.assertThatFuture(completableFuture2).isDone();
        Assertions.assertThat(build.isEmpty()).isTrue();
        FlinkAssertions.assertThatFuture(completableFuture).isDone();
    }

    @Test
    void tesDuplicatedReturnLogicalSlotFails() {
        CompletableFuture<PhysicalSlot> completedFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        SharedSlot build = SharedSlotBuilder.newBuilder().withSlotContextFuture(completedFuture).withExternalReleaseCallback(executionSlotSharingGroup -> {
            atomicInteger.incrementAndGet();
        }).build();
        LogicalSlot logicalSlot = (LogicalSlot) build.allocateLogicalSlot(EV1).join();
        build.returnLogicalSlot(logicalSlot);
        Assertions.assertThatThrownBy(() -> {
            build.returnLogicalSlot(logicalSlot);
        }).withFailMessage("Duplicated 'returnLogicalSlot' call should fail with IllegalStateException", new Object[0]).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testReleaseEmptyDoesNotCallAllocatorReleaseBack() {
        CompletableFuture<PhysicalSlot> completedFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        SharedSlot build = SharedSlotBuilder.newBuilder().withSlotContextFuture(completedFuture).withExternalReleaseCallback(executionSlotSharingGroup -> {
            ((SharedSlot) completableFuture.join()).release(new Throwable());
            atomicInteger.incrementAndGet();
        }).build();
        completableFuture.complete(build);
        LogicalSlot logicalSlot = (LogicalSlot) build.allocateLogicalSlot(EV1).join();
        Assertions.assertThat(atomicInteger).hasValue(0);
        build.returnLogicalSlot(logicalSlot);
        Assertions.assertThat(atomicInteger).hasValue(1);
        build.release(new Throwable());
        Assertions.assertThat(atomicInteger).hasValue(1);
    }

    @Test
    void testReturnLogicalSlotWhileReleasingDoesNotCauseConcurrentModificationException() {
        final SharedSlot build = SharedSlotBuilder.newBuilder().withSlotContextFuture(CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()))).build();
        LogicalSlot logicalSlot = (LogicalSlot) build.allocateLogicalSlot(EV1).join();
        final LogicalSlot logicalSlot2 = (LogicalSlot) build.allocateLogicalSlot(EV2).join();
        logicalSlot.tryAssignPayload(new LogicalSlot.Payload() { // from class: org.apache.flink.runtime.scheduler.SharedSlotTest.1
            public void fail(Throwable th) {
                build.returnLogicalSlot(logicalSlot2);
            }

            public CompletableFuture<?> getTerminalStateFuture() {
                return CompletableFuture.completedFuture(null);
            }
        });
        build.release(new Throwable());
    }
}
