package org.apache.flink.runtime.scheduler;

import java.net.InetAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.BiConsumerWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.class */
public class SimpleExecutionSlotAllocatorTest {
    private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources(3.0d, 5);
    private static final ExecutionAttemptID EXECUTION_ATTEMPT_ID = ExecutionGraphTestUtils.createExecutionAttemptId();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest$AllocationContext.class */
    public static class AllocationContext {
        private final TestingPhysicalSlotProvider slotProvider;
        private final boolean slotWillBeOccupiedIndefinitely;
        private final Map<ExecutionVertexID, Collection<TaskManagerLocation>> locations;
        private final SimpleExecutionSlotAllocator allocator;

        public AllocationContext() {
            this(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), false);
        }

        public AllocationContext(TestingPhysicalSlotProvider testingPhysicalSlotProvider, boolean z) {
            this.slotProvider = testingPhysicalSlotProvider;
            this.slotWillBeOccupiedIndefinitely = z;
            this.locations = new HashMap();
            this.allocator = new SimpleExecutionSlotAllocator(testingPhysicalSlotProvider, executionAttemptID -> {
                return SimpleExecutionSlotAllocatorTest.RESOURCE_PROFILE;
            }, (executionVertexID, set) -> {
                return this.locations.getOrDefault(executionVertexID, Collections.emptyList());
            }, z);
        }

        private CompletableFuture<LogicalSlot> allocateSlotsFor(ExecutionAttemptID executionAttemptID) {
            return ((ExecutionSlotAssignment) this.allocator.allocateSlotsFor(Collections.singletonList(executionAttemptID)).get(executionAttemptID)).getLogicalSlotFuture();
        }

        public TestingPhysicalSlotProvider getSlotProvider() {
            return this.slotProvider;
        }

        public boolean isSlotWillBeOccupiedIndefinitely() {
            return this.slotWillBeOccupiedIndefinitely;
        }

        public Map<ExecutionVertexID, Collection<TaskManagerLocation>> getLocations() {
            return this.locations;
        }

        public SimpleExecutionSlotAllocator getAllocator() {
            return this.allocator;
        }
    }

    SimpleExecutionSlotAllocatorTest() {
    }

    @Test
    void testSlotAllocation() {
        AllocationContext allocationContext = new AllocationContext();
        Assertions.assertThat(allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID)).isCompleted();
        Assertions.assertThat(allocationContext.getSlotProvider().getRequests()).hasSize(1);
        Assertions.assertThat(allocationContext.getSlotProvider().getRequests().values().iterator().next().getSlotProfile().getPhysicalSlotResourceProfile()).isEqualTo(RESOURCE_PROFILE);
    }

    @Test
    void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws Exception {
        AllocationContext allocationContext = new AllocationContext();
        Assertions.assertThat(allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID).get()).isSameAs(allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID).get());
    }

    @Test
    void testFailedPhysicalSlotRequestFailsLogicalSlotFuture() {
        AllocationContext allocationContext = new AllocationContext(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation(), false);
        CompletableFuture<LogicalSlot> allocateSlotsFor = allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
        SlotRequestId slotRequestId = allocationContext.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        Assertions.assertThat(allocateSlotsFor).isNotDone();
        allocationContext.getSlotProvider().getResponses().get(slotRequestId).completeExceptionally(new Throwable());
        Assertions.assertThat(allocateSlotsFor).isCompletedExceptionally();
        allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
        Assertions.assertThat(allocationContext.getSlotProvider().getRequests()).hasSize(2);
    }

    @Test
    void testSlotWillBeOccupiedIndefinitelyFalse() throws Exception {
        testSlotWillBeOccupiedIndefinitely(false);
    }

    @Test
    void testSlotWillBeOccupiedIndefinitelyTrue() throws Exception {
        testSlotWillBeOccupiedIndefinitely(true);
    }

    private static void testSlotWillBeOccupiedIndefinitely(boolean z) throws Exception {
        AllocationContext allocationContext = new AllocationContext(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), z);
        allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
        PhysicalSlotRequest firstRequestOrFail = allocationContext.getSlotProvider().getFirstRequestOrFail();
        Assertions.assertThat(firstRequestOrFail.willSlotBeOccupiedIndefinitely()).isEqualTo(z);
        TestingPhysicalSlot testingPhysicalSlot = allocationContext.getSlotProvider().getResponses().get(firstRequestOrFail.getSlotRequestId()).get();
        Assertions.assertThat(testingPhysicalSlot.getPayload()).isNotNull();
        Assertions.assertThat(testingPhysicalSlot.getPayload().willOccupySlotIndefinitely()).isEqualTo(z);
    }

    @Test
    void testLogicalSlotReleasingCancelsPhysicalSlotRequest() throws Exception {
        testLogicalSlotRequestCancellationOrRelease(true, true, (allocationContext, completableFuture) -> {
            ((LogicalSlot) completableFuture.get()).releaseSlot((Throwable) null);
        });
    }

    @Test
    void testLogicalSlotCancellationCancelsPhysicalSlotRequest() throws Exception {
        testLogicalSlotRequestCancellationOrRelease(false, true, (allocationContext, completableFuture) -> {
            Assertions.assertThatThrownBy(() -> {
                allocationContext.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
                completableFuture.get();
            }).as("The logical future must finish with a cancellation exception.", new Object[0]).isInstanceOf(CancellationException.class);
        });
    }

    @Test
    void testCompletedLogicalSlotCancellationDoesNotCancelPhysicalSlotRequest() throws Exception {
        testLogicalSlotRequestCancellationOrRelease(true, false, (allocationContext, completableFuture) -> {
            allocationContext.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
            completableFuture.get();
        });
    }

    private static void testLogicalSlotRequestCancellationOrRelease(boolean z, boolean z2, BiConsumerWithException<AllocationContext, CompletableFuture<LogicalSlot>, Exception> biConsumerWithException) throws Exception {
        AllocationContext allocationContext = new AllocationContext(!z ? TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation() : TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), false);
        biConsumerWithException.accept(allocationContext, allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID));
        Assertions.assertThat(allocationContext.getSlotProvider().getCancellations().containsKey(allocationContext.getSlotProvider().getFirstRequestOrFail().getSlotRequestId())).isEqualTo(z2);
        allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
        Assertions.assertThat(allocationContext.getSlotProvider().getRequests()).hasSize(z2 ? 2 : 1);
    }

    @Test
    void testPhysicalSlotReleasesLogicalSlots() throws Exception {
        AllocationContext allocationContext = new AllocationContext();
        CompletableFuture<LogicalSlot> allocateSlotsFor = allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
        TestingPayload testingPayload = new TestingPayload();
        allocateSlotsFor.thenAccept(logicalSlot -> {
            logicalSlot.tryAssignPayload(testingPayload);
        });
        SlotRequestId slotRequestId = allocationContext.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        TestingPhysicalSlot testingPhysicalSlot = allocationContext.getSlotProvider().getFirstResponseOrFail().get();
        Assertions.assertThat(testingPayload.getTerminalStateFuture()).isNotDone();
        Assertions.assertThat(testingPhysicalSlot.getPayload()).isNotNull();
        testingPhysicalSlot.getPayload().release(new Throwable());
        Assertions.assertThat(testingPayload.getTerminalStateFuture()).isDone();
        Assertions.assertThat(allocationContext.getSlotProvider().getCancellations()).containsKey(slotRequestId);
        allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
        Assertions.assertThat(allocationContext.getSlotProvider().getRequests()).hasSize(2);
    }

    @Test
    void testFailLogicalSlotIfPhysicalSlotIsFails() {
        AllocationContext allocationContext = new AllocationContext(TestingPhysicalSlotProvider.createWithFailingPhysicalSlotCreation(new FlinkException("test failure")), false);
        Assertions.assertThat(allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID)).isCompletedExceptionally();
        Assertions.assertThat(allocationContext.getSlotProvider().getCancellations().keySet()).isEqualTo(allocationContext.getSlotProvider().getRequests().keySet());
    }

    @Test
    void testSlotProviderBatchSlotRequestTimeoutCheckIsEnabled() {
        Assertions.assertThat(new AllocationContext().getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
    }

    @Test
    void testPreferredLocationsOfSlotProfile() {
        AllocationContext allocationContext = new AllocationContext();
        List singletonList = Collections.singletonList(new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 41));
        allocationContext.getLocations().put(EXECUTION_ATTEMPT_ID.getExecutionVertexId(), singletonList);
        allocationContext.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
        Assertions.assertThat(allocationContext.getSlotProvider().getRequests()).hasSize(1);
        PhysicalSlotRequest next = allocationContext.getSlotProvider().getRequests().values().iterator().next();
        Assertions.assertThat(next.getSlotProfile().getPreferredLocations()).hasSize(1);
        Assertions.assertThat(next.getSlotProfile().getPreferredLocations()).isEqualTo(singletonList);
    }
}
