/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingPayload;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.ExecutionSlotAssignment;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever;
import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.BiConsumerWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SlotSharingExecutionSlotAllocatorTest {
    private static final Time ALLOCATION_TIMEOUT = Time.milliseconds((long)100L);
    private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources((double)3.0, (int)5);
    private static final ExecutionVertexID EV1 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV2 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV3 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV4 = ExecutionGraphTestUtils.createRandomExecutionVertexId();

    SlotSharingExecutionSlotAllocatorTest() {
    }

    @Test
    void testSlotProfileRequestAskedBulkAndGroup() {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2}).build();
        ExecutionSlotSharingGroup executionSlotSharingGroup = context.getSlotSharingStrategy().getExecutionSlotSharingGroup(EV1);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        List askedBulks = context.getSlotProfileRetrieverFactory().getAskedBulks();
        Assertions.assertThat((List)askedBulks).hasSize(1);
        Assertions.assertThat((Collection)((Collection)askedBulks.get(0))).containsExactlyInAnyOrder((Object[])new ExecutionVertexID[]{EV1, EV2});
        Assertions.assertThat((List)context.getSlotProfileRetrieverFactory().getAskedGroups()).containsExactly((Object[])new ExecutionSlotSharingGroup[]{executionSlotSharingGroup});
    }

    @Test
    void testSlotRequestProfile() {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2, SlotSharingExecutionSlotAllocatorTest.EV3}).build();
        ResourceProfile physicalsSlotResourceProfile = RESOURCE_PROFILE.multiply(3);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        Optional<PhysicalSlotRequest> slotRequest = context.getSlotProvider().getRequests().values().stream().findFirst();
        Assertions.assertThat(slotRequest).isPresent();
        Assertions.assertThat((Object)slotRequest.get().getSlotProfile().getPhysicalSlotResourceProfile()).isEqualTo((Object)physicalsSlotResourceProfile);
    }

    @Test
    void testAllocatePhysicalSlotForNewSharedSlot() {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2}).addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV3, SlotSharingExecutionSlotAllocatorTest.EV4}).build();
        List executionSlotAssignments = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2, SlotSharingExecutionSlotAllocatorTest.EV3, SlotSharingExecutionSlotAllocatorTest.EV4});
        List<ExecutionVertexID> assignIds = SlotSharingExecutionSlotAllocatorTest.getAssignIds(executionSlotAssignments);
        Assertions.assertThat(assignIds).containsExactlyInAnyOrder((Object[])new ExecutionVertexID[]{EV1, EV2, EV3, EV4});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(2);
    }

    @Test
    void testAllocateLogicalSlotFromAvailableSharedSlot() {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2}).build();
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1});
        List executionSlotAssignments = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV2});
        List<ExecutionVertexID> assignIds = SlotSharingExecutionSlotAllocatorTest.getAssignIds(executionSlotAssignments);
        Assertions.assertThat(assignIds).containsExactly((Object[])new ExecutionVertexID[]{EV2});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(1);
    }

    @Test
    void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1}).build();
        ExecutionSlotAssignment assignment1 = (ExecutionSlotAssignment)context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1}).get(0);
        ExecutionSlotAssignment assignment2 = (ExecutionSlotAssignment)context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1}).get(0);
        Assertions.assertThat(assignment1.getLogicalSlotFuture().get()).isSameAs(assignment2.getLogicalSlotFuture().get());
    }

    @Test
    void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1}).withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation()).build();
        CompletableFuture logicalSlotFuture = ((ExecutionSlotAssignment)context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1}).get(0)).getLogicalSlotFuture();
        SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        Assertions.assertThat((CompletableFuture)logicalSlotFuture).isNotDone();
        context.getSlotProvider().getResponses().get(slotRequestId).completeExceptionally(new Throwable());
        Assertions.assertThat((CompletableFuture)logicalSlotFuture).isCompletedExceptionally();
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(2);
    }

    @Test
    void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException {
        SlotSharingExecutionSlotAllocatorTest.testSlotWillBeOccupiedIndefinitely(false);
    }

    @Test
    void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException {
        SlotSharingExecutionSlotAllocatorTest.testSlotWillBeOccupiedIndefinitely(true);
    }

    private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws ExecutionException, InterruptedException {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1}).setSlotWillBeOccupiedIndefinitely(slotWillBeOccupiedIndefinitely).build();
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1});
        PhysicalSlotRequest slotRequest = context.getSlotProvider().getFirstRequestOrFail();
        Assertions.assertThat((boolean)slotRequest.willSlotBeOccupiedIndefinitely()).isEqualTo(slotWillBeOccupiedIndefinitely);
        TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get();
        Assertions.assertThat((Object)physicalSlot.getPayload()).isNotNull();
        Assertions.assertThat((boolean)physicalSlot.getPayload().willOccupySlotIndefinitely()).isEqualTo(slotWillBeOccupiedIndefinitely);
    }

    @Test
    void testReturningLogicalSlotsRemovesSharedSlot() throws Exception {
        SlotSharingExecutionSlotAllocatorTest.testLogicalSlotRequestCancellationOrRelease(false, true, (BiConsumerWithException<AllocationContext, ExecutionSlotAssignment, Exception>)((BiConsumerWithException)(context, assignment) -> ((LogicalSlot)assignment.getLogicalSlotFuture().get()).releaseSlot(null)));
    }

    @Test
    void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception {
        SlotSharingExecutionSlotAllocatorTest.testLogicalSlotRequestCancellationOrRelease(true, true, (BiConsumerWithException<AllocationContext, ExecutionSlotAssignment, Exception>)((BiConsumerWithException)(context, assignment) -> {
            ((AllocationContext)context).getAllocator().cancel(assignment.getExecutionAttemptId());
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
                ((AllocationContext)context).getAllocator().cancel(assignment.getExecutionAttemptId());
                assignment.getLogicalSlotFuture().get();
            }).as("The logical future must finish with the cancellation exception.", new Object[0])).hasCauseInstanceOf(CancellationException.class);
        }));
    }

    @Test
    void testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestAndDoesNotRemoveSharedSlot() throws Exception {
        SlotSharingExecutionSlotAllocatorTest.testLogicalSlotRequestCancellationOrRelease(false, false, (BiConsumerWithException<AllocationContext, ExecutionSlotAssignment, Exception>)((BiConsumerWithException)(context, assignment) -> {
            ((AllocationContext)context).getAllocator().cancel(assignment.getExecutionAttemptId());
            assignment.getLogicalSlotFuture().get();
        }));
    }

    private static void testLogicalSlotRequestCancellationOrRelease(boolean completePhysicalSlotFutureManually, boolean cancelsPhysicalSlotRequestAndRemovesSharedSlot, BiConsumerWithException<AllocationContext, ExecutionSlotAssignment, Exception> cancelOrReleaseAction) throws Exception {
        AllocationContext.Builder allocationContextBuilder = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2, SlotSharingExecutionSlotAllocatorTest.EV3});
        if (completePhysicalSlotFutureManually) {
            allocationContextBuilder.withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation());
        }
        AllocationContext context = allocationContextBuilder.build();
        List assignments = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(1);
        cancelOrReleaseAction.accept((Object)context, assignments.get(0));
        List assignmentsAfterOneCancellation = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(1);
        for (ExecutionSlotAssignment assignment : assignmentsAfterOneCancellation) {
            cancelOrReleaseAction.accept((Object)context, (Object)assignment);
        }
        SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        Assertions.assertThat((boolean)context.getSlotProvider().getCancellations().containsKey(slotRequestId)).isEqualTo(cancelsPhysicalSlotRequestAndRemovesSharedSlot);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV3});
        int expectedNumberOfRequests = cancelsPhysicalSlotRequestAndRemovesSharedSlot ? 2 : 1;
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(expectedNumberOfRequests);
    }

    @Test
    void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException {
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2}).build();
        List assignments = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        List payloads = assignments.stream().map(assignment -> {
            TestingPayload payload = new TestingPayload();
            assignment.getLogicalSlotFuture().thenAccept(logicalSlot -> logicalSlot.tryAssignPayload((LogicalSlot.Payload)payload));
            return payload;
        }).collect(Collectors.toList());
        SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        TestingPhysicalSlot physicalSlot = context.getSlotProvider().getFirstResponseOrFail().get();
        Assertions.assertThat((boolean)payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone())).isFalse();
        Assertions.assertThat((Object)physicalSlot.getPayload()).isNotNull();
        physicalSlot.getPayload().release(new Throwable());
        Assertions.assertThat((boolean)payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone())).isTrue();
        Assertions.assertThat(context.getSlotProvider().getCancellations()).containsKey((Object)slotRequestId);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(2);
    }

    @Test
    void testSchedulePendingRequestBulkTimeoutCheck() {
        TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext context = SlotSharingExecutionSlotAllocatorTest.createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV3});
        PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
        Assertions.assertThat((Collection)bulk.getPendingRequests()).hasSize(2);
        Assertions.assertThat((Collection)bulk.getPendingRequests()).containsExactlyInAnyOrder((Object[])new ResourceProfile[]{RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE});
        Assertions.assertThat((Collection)bulk.getAllocationIdsOfFulfilledRequests()).isEmpty();
        Assertions.assertThat((Object)bulkChecker.getTimeout()).isEqualTo((Object)ALLOCATION_TIMEOUT);
    }

    @Test
    void testRequestFulfilledInBulk() {
        TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext context = SlotSharingExecutionSlotAllocatorTest.createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV3});
        AllocationID allocationId = new AllocationID();
        ResourceProfile pendingSlotResourceProfile = SlotSharingExecutionSlotAllocatorTest.fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId);
        PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
        Assertions.assertThat((Collection)bulk.getPendingRequests()).hasSize(1);
        Assertions.assertThat((Collection)bulk.getPendingRequests()).containsExactly((Object[])new ResourceProfile[]{pendingSlotResourceProfile});
        Assertions.assertThat((Collection)bulk.getAllocationIdsOfFulfilledRequests()).hasSize(1);
        Assertions.assertThat((Collection)bulk.getAllocationIdsOfFulfilledRequests()).containsExactly((Object[])new AllocationID[]{allocationId});
    }

    @Test
    void testRequestBulkCancel() {
        TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext context = SlotSharingExecutionSlotAllocatorTest.createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
        List assignments1 = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV3});
        SlotSharingExecutionSlotAllocatorTest.fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new AllocationID());
        PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk();
        List assignments2 = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV2});
        bulk1.cancel(new Throwable());
        CompletableFuture ev1slot = ((ExecutionSlotAssignment)assignments1.get(0)).getLogicalSlotFuture();
        boolean ev1failed = ev1slot.isCompletedExceptionally();
        CompletableFuture ev3slot = ((ExecutionSlotAssignment)assignments1.get(1)).getLogicalSlotFuture();
        boolean ev3failed = ev3slot.isCompletedExceptionally();
        LogicalSlot slot = ev1failed ? (LogicalSlot)ev3slot.join() : (LogicalSlot)ev1slot.join();
        SlotSharingExecutionSlotAllocatorTest.releaseLogicalSlot(slot);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV3});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(3);
        Assertions.assertThat((boolean)ev1failed).isNotEqualTo(ev3failed);
        Assertions.assertThat((CompletableFuture)((ExecutionSlotAssignment)assignments2.get(0)).getLogicalSlotFuture()).isNotCompletedExceptionally();
    }

    private static void releaseLogicalSlot(LogicalSlot slot) {
        slot.tryAssignPayload((LogicalSlot.Payload)new DummyPayload(CompletableFuture.completedFuture(null)));
        slot.releaseSlot(new Throwable());
    }

    @Test
    void testBulkClearIfPhysicalSlotRequestFails() {
        TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext context = SlotSharingExecutionSlotAllocatorTest.createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV3});
        SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        context.getSlotProvider().getResultForRequestId(slotRequestId).completeExceptionally(new Throwable());
        PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
        Assertions.assertThat((Collection)bulk.getPendingRequests()).isEmpty();
    }

    @Test
    void failLogicalSlotsIfPhysicalSlotIsFailed() {
        TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext context = AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2}).withBulkChecker(bulkChecker).withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithFailingPhysicalSlotCreation(new FlinkException("test failure"))).build();
        List allocatedSlots = context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        for (ExecutionSlotAssignment allocatedSlot : allocatedSlots) {
            Assertions.assertThat((CompletableFuture)allocatedSlot.getLogicalSlotFuture()).isCompletedExceptionally();
        }
        Assertions.assertThat((Collection)bulkChecker.getBulk().getPendingRequests()).isEmpty();
        Set<SlotRequestId> requests = context.getSlotProvider().getRequests().keySet();
        Assertions.assertThat(context.getSlotProvider().getCancellations().keySet()).isEqualTo(requests);
    }

    @Test
    void testSlotRequestProfileFromExecutionSlotSharingGroup() {
        ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        AllocationContext context = AllocationContext.newBuilder().addGroupAndResource(resourceProfile1, new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV3}).addGroupAndResource(resourceProfile2, new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV2, SlotSharingExecutionSlotAllocatorTest.EV4}).build();
        context.allocateSlotsFor(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2});
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(2);
        Assertions.assertThat(context.getSlotProvider().getRequests().values().stream().map(PhysicalSlotRequest::getSlotProfile).map(SlotProfile::getPhysicalSlotResourceProfile).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new ResourceProfile[]{resourceProfile1, resourceProfile2});
    }

    @Test
    void testSlotProviderBatchSlotRequestTimeoutCheckIsDisabled() {
        AllocationContext context = AllocationContext.newBuilder().build();
        Assertions.assertThat((boolean)context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isFalse();
    }

    private static List<ExecutionVertexID> getAssignIds(Collection<ExecutionSlotAssignment> assignments) {
        return assignments.stream().map(ExecutionSlotAssignment::getExecutionAttemptId).map(ExecutionAttemptID::getExecutionVertexId).collect(Collectors.toList());
    }

    private static AllocationContext createBulkCheckerContextWithEv12GroupAndEv3Group(PhysicalSlotRequestBulkChecker bulkChecker) {
        return AllocationContext.newBuilder().addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV1, SlotSharingExecutionSlotAllocatorTest.EV2}).addGroup(new ExecutionVertexID[]{SlotSharingExecutionSlotAllocatorTest.EV3}).withBulkChecker(bulkChecker).withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation()).build();
    }

    private static ResourceProfile fulfilOneOfTwoSlotRequestsAndGetPendingProfile(AllocationContext context, AllocationID allocationId) {
        Map<SlotRequestId, PhysicalSlotRequest> requests = context.getSlotProvider().getRequests();
        ArrayList<SlotRequestId> slotRequestIds = new ArrayList<SlotRequestId>(requests.keySet());
        Assertions.assertThat(slotRequestIds).hasSize(2);
        SlotRequestId slotRequestId1 = (SlotRequestId)slotRequestIds.get(0);
        SlotRequestId slotRequestId2 = (SlotRequestId)slotRequestIds.get(1);
        context.getSlotProvider().getResultForRequestId(slotRequestId1).complete(TestingPhysicalSlot.builder().withAllocationID(allocationId).build());
        return requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile();
    }

    private static class TestingSharedSlotProfileRetrieverFactory
    implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
        private final List<Set<ExecutionVertexID>> askedBulks = new ArrayList<Set<ExecutionVertexID>>();
        private final List<ExecutionSlotSharingGroup> askedGroups = new ArrayList<ExecutionSlotSharingGroup>();

        private TestingSharedSlotProfileRetrieverFactory() {
        }

        public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) {
            this.askedBulks.add(bulk);
            return (group, resourceProfile) -> {
                this.askedGroups.add(group);
                return SlotProfileTestingUtils.noLocality(resourceProfile);
            };
        }

        private List<Set<ExecutionVertexID>> getAskedBulks() {
            return Collections.unmodifiableList(this.askedBulks);
        }

        private List<ExecutionSlotSharingGroup> getAskedGroups() {
            return Collections.unmodifiableList(this.askedGroups);
        }
    }

    private static class TestingSlotSharingStrategy
    implements SlotSharingStrategy {
        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups;

        private TestingSlotSharingStrategy(Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups) {
            this.executionSlotSharingGroups = executionSlotSharingGroups;
        }

        public ExecutionSlotSharingGroup getExecutionSlotSharingGroup(ExecutionVertexID executionVertexId) {
            return this.executionSlotSharingGroups.get(executionVertexId);
        }

        public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() {
            return new HashSet<ExecutionSlotSharingGroup>(this.executionSlotSharingGroups.values());
        }

        private static TestingSlotSharingStrategy createWithGroupsAndResources(Map<ExecutionVertexID[], ResourceProfile> groupAndResources) {
            HashMap<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups = new HashMap<ExecutionVertexID, ExecutionSlotSharingGroup>();
            for (Map.Entry<ExecutionVertexID[], ResourceProfile> groupAndResource : groupAndResources.entrySet()) {
                ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup();
                executionSlotSharingGroup.setResourceProfile(groupAndResource.getValue());
                for (ExecutionVertexID executionVertexId : groupAndResource.getKey()) {
                    executionSlotSharingGroup.addVertex(executionVertexId);
                    executionSlotSharingGroups.put(executionVertexId, executionSlotSharingGroup);
                }
            }
            return new TestingSlotSharingStrategy(executionSlotSharingGroups);
        }
    }

    private static class AllocationContext {
        private final TestingPhysicalSlotProvider slotProvider;
        private final TestingSlotSharingStrategy slotSharingStrategy;
        private final SlotSharingExecutionSlotAllocator allocator;
        private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory;

        private AllocationContext(TestingPhysicalSlotProvider slotProvider, TestingSlotSharingStrategy slotSharingStrategy, SlotSharingExecutionSlotAllocator allocator, TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) {
            this.slotProvider = slotProvider;
            this.slotSharingStrategy = slotSharingStrategy;
            this.allocator = allocator;
            this.slotProfileRetrieverFactory = slotProfileRetrieverFactory;
        }

        private SlotSharingExecutionSlotAllocator getAllocator() {
            return this.allocator;
        }

        private List<ExecutionSlotAssignment> allocateSlotsFor(ExecutionVertexID ... ids) {
            return this.allocator.allocateSlotsFor(Arrays.stream(ids).map(executionVertexId -> ExecutionGraphTestUtils.createExecutionAttemptId(executionVertexId, 0)).collect(Collectors.toList()));
        }

        private TestingSlotSharingStrategy getSlotSharingStrategy() {
            return this.slotSharingStrategy;
        }

        private TestingPhysicalSlotProvider getSlotProvider() {
            return this.slotProvider;
        }

        private TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() {
            return this.slotProfileRetrieverFactory;
        }

        private static Builder newBuilder() {
            return new Builder();
        }

        private static class Builder {
            private final Map<ExecutionVertexID[], ResourceProfile> groups = new HashMap<ExecutionVertexID[], ResourceProfile>();
            private boolean slotWillBeOccupiedIndefinitely = false;
            private PhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
            private TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithInfiniteSlotCreation();

            private Builder() {
            }

            private Builder addGroup(ExecutionVertexID ... group) {
                this.groups.put(group, ResourceProfile.UNKNOWN);
                return this;
            }

            private Builder addGroupAndResource(ResourceProfile resourceProfile, ExecutionVertexID ... group) {
                this.groups.put(group, resourceProfile);
                return this;
            }

            private Builder setSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) {
                this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
                return this;
            }

            private Builder withBulkChecker(PhysicalSlotRequestBulkChecker bulkChecker) {
                this.bulkChecker = bulkChecker;
                return this;
            }

            private Builder withPhysicalSlotProvider(TestingPhysicalSlotProvider physicalSlotProvider) {
                this.physicalSlotProvider = physicalSlotProvider;
                return this;
            }

            private AllocationContext build() {
                TestingSharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new TestingSharedSlotProfileRetrieverFactory();
                TestingSlotSharingStrategy slotSharingStrategy = TestingSlotSharingStrategy.createWithGroupsAndResources(this.groups);
                SlotSharingExecutionSlotAllocator allocator = new SlotSharingExecutionSlotAllocator((PhysicalSlotProvider)this.physicalSlotProvider, this.slotWillBeOccupiedIndefinitely, (SlotSharingStrategy)slotSharingStrategy, (SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory)sharedSlotProfileRetrieverFactory, this.bulkChecker, ALLOCATION_TIMEOUT, executionVertexID -> RESOURCE_PROFILE);
                return new AllocationContext(this.physicalSlotProvider, slotSharingStrategy, allocator, sharedSlotProfileRetrieverFactory);
            }
        }
    }
}

