/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolResource;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;

public class SlotPoolSlotSpreadOutTest
extends TestLogger {
    public static final Time TIMEOUT = Time.seconds((long)10L);
    @Rule
    public final SlotPoolResource slotPoolResource = new SlotPoolResource((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut());

    @Test
    public void allocateSingleSlot_withNoRequirements_selectsSlotSoThatWorkloadIsSpreadOut() {
        this.registerTaskExecutors(2, 4);
        ScheduledUnit firstSlotRequest = this.createSimpleSlotRequest();
        ScheduledUnit secondSlotRequest = this.createSimpleSlotRequest();
        CompletableFuture<LogicalSlot> firstSlotFuture = this.allocateSlot(firstSlotRequest);
        CompletableFuture<LogicalSlot> secondSlotFuture = this.allocateSlot(secondSlotRequest);
        TaskManagerLocation firstTaskManagerLocation = this.getTaskManagerLocation(firstSlotFuture);
        TaskManagerLocation secondTaskManagerLocation = this.getTaskManagerLocation(secondSlotFuture);
        MatcherAssert.assertThat((Object)firstTaskManagerLocation, (Matcher)Matchers.is((Matcher)Matchers.not((Matcher)Matchers.equalTo((Object)secondTaskManagerLocation))));
    }

    @Test
    public void allocateSingleSlot_withInputPreference_inputPreferenceHasPrecedenceOverSpreadOut() {
        this.registerTaskExecutors(2, 2);
        ScheduledUnit sourceSlotRequest = this.createSimpleSlotRequest();
        ScheduledUnit sinkSlotRequest = this.createSimpleSlotRequest();
        CompletableFuture<LogicalSlot> sourceSlotFuture = this.allocateSlot(sourceSlotRequest);
        TaskManagerLocation sourceTaskManagerLocation = this.getTaskManagerLocation(sourceSlotFuture);
        Set<TaskManagerLocation> preferredLocations = Collections.singleton(sourceTaskManagerLocation);
        CompletableFuture<LogicalSlot> sinkSlotFuture = this.allocateSlotWithInputPreference(sinkSlotRequest, preferredLocations);
        TaskManagerLocation sinkTaskManagerLocation = this.getTaskManagerLocation(sinkSlotFuture);
        MatcherAssert.assertThat((Object)sinkTaskManagerLocation, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)sourceTaskManagerLocation)));
    }

    @Test
    public void allocateSharedSlot_withNoRequirements_selectsSlotsSoThatWorkloadIsSpreadOut() {
        int numberSlotsPerTaskExecutor = 2;
        int numberTaskExecutors = 2;
        int numberSlots = 4;
        this.registerTaskExecutors(2, 2);
        JobVertexID sourceJobVertexId = new JobVertexID();
        JobVertexID sinkJobVertexId = new JobVertexID();
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        List<ScheduledUnit> sourceScheduledUnits = IntStream.range(0, 4).mapToObj(ignored -> this.createSharedSlotRequest(sourceJobVertexId, slotSharingGroupId)).collect(Collectors.toList());
        List sinkScheduledUnits = IntStream.range(0, 2).mapToObj(ignored -> this.createSharedSlotRequest(sinkJobVertexId, slotSharingGroupId)).collect(Collectors.toList());
        sourceScheduledUnits.forEach(this::allocateSlot);
        Set sinkLocations = sinkScheduledUnits.stream().map(this::allocateSlot).map(this::getTaskManagerLocation).collect(Collectors.toSet());
        MatcherAssert.assertThat(sinkLocations, (Matcher)Matchers.hasSize((int)2));
    }

    private ScheduledUnit createSharedSlotRequest(JobVertexID jobVertexId, SlotSharingGroupId slotSharingGroupId) {
        return new ScheduledUnit(jobVertexId, slotSharingGroupId, null);
    }

    private ScheduledUnit createSimpleSlotRequest() {
        return new ScheduledUnit(new JobVertexID(), null, null);
    }

    private CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit scheduledUnit) {
        return this.internalAllocateSlot(scheduledUnit, SlotProfile.noRequirements());
    }

    private CompletableFuture<LogicalSlot> internalAllocateSlot(ScheduledUnit scheduledUnit, SlotProfile slotProfile) {
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        return slotProvider.allocateSlot(new SlotRequestId(), scheduledUnit, slotProfile, TIMEOUT);
    }

    private CompletableFuture<LogicalSlot> allocateSlotWithInputPreference(ScheduledUnit scheduledUnit, Collection<TaskManagerLocation> preferredLocations) {
        return this.internalAllocateSlot(scheduledUnit, SlotProfile.preferredLocality((ResourceProfile)ResourceProfile.UNKNOWN, preferredLocations));
    }

    private TaskManagerLocation getTaskManagerLocation(CompletableFuture<? extends LogicalSlot> slotFuture) {
        return slotFuture.join().getTaskManagerLocation();
    }

    private void registerTaskExecutors(int numberTaskExecutors, int numberSlotsPerTaskExecutor) {
        for (int i = 0; i < numberTaskExecutors; ++i) {
            this.registerTaskExecutor(numberSlotsPerTaskExecutor);
        }
    }

    private void registerTaskExecutor(int numberSlotsPerTaskExecutor) {
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        Collection slotOffers = IntStream.range(0, numberSlotsPerTaskExecutor).mapToObj(index -> new SlotOffer(new AllocationID(), index, ResourceProfile.ANY)).collect(Collectors.toList());
        slotPool.offerSlots((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), slotOffers);
    }
}

