/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotMatchingResolver;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

public class DefaultSlotAssigner
implements SlotAssigner {
    @VisibleForTesting
    static final String APPLICATION_MODE_EXECUTION_TARGET = "embedded";
    @Nullable
    private final String executionTarget;
    private final boolean minimalTaskManagerPreferred;
    private final SlotSharingResolver slotSharingResolver;
    private final SlotMatchingResolver slotMatchingResolver;

    DefaultSlotAssigner(@Nullable String executionTarget, boolean minimalTaskManagerPreferred, SlotSharingResolver slotSharingResolver, SlotMatchingResolver slotMatchingResolver) {
        this.executionTarget = executionTarget;
        this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
        this.slotSharingResolver = slotSharingResolver;
        this.slotMatchingResolver = slotMatchingResolver;
    }

    @Override
    public Collection<JobSchedulingPlan.SlotAssignment> assignSlots(JobInformation jobInformation, Collection<PhysicalSlot> freeSlots, VertexParallelism vertexParallelism, JobAllocationsInformation previousAllocations) {
        AllocatorUtil.checkMinimumRequiredSlots(jobInformation, freeSlots);
        Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> allGroups = this.slotSharingResolver.getExecutionSlotSharingGroups(jobInformation, vertexParallelism);
        Collection<PhysicalSlot> pickedSlots = this.pickSlotsIfNeeded(allGroups.size(), freeSlots);
        return this.slotMatchingResolver.matchSlotSharingGroupWithSlots(allGroups, pickedSlots);
    }

    @VisibleForTesting
    Collection<PhysicalSlot> pickSlotsIfNeeded(int requestExecutionSlotSharingGroups, Collection<PhysicalSlot> freeSlots) {
        Collection<PhysicalSlot> pickedSlots = freeSlots;
        if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(this.executionTarget) && this.minimalTaskManagerPreferred && freeSlots.size() > requestExecutionSlotSharingGroups) {
            Map<TaskManagerLocation, Set<PhysicalSlot>> slotsPerTaskExecutor = this.getSlotsPerTaskExecutor(freeSlots);
            pickedSlots = this.pickSlotsInMinimalTaskExecutors(slotsPerTaskExecutor, requestExecutionSlotSharingGroups);
        }
        return pickedSlots;
    }

    private Iterator<TaskManagerLocation> getSortedTaskExecutors(Map<TaskManagerLocation, Set<PhysicalSlot>> slotsPerTaskExecutor) {
        Comparator taskExecutorComparator = (leftTml, rightTml) -> Integer.compare(((Set)slotsPerTaskExecutor.get(rightTml)).size(), ((Set)slotsPerTaskExecutor.get(leftTml)).size());
        return slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
    }

    private Collection<PhysicalSlot> pickSlotsInMinimalTaskExecutors(Map<TaskManagerLocation, Set<PhysicalSlot>> slotsByTaskExecutor, int requestedGroups) {
        ArrayList<PhysicalSlot> pickedSlots = new ArrayList<PhysicalSlot>();
        Iterator<TaskManagerLocation> sortedTaskExecutors = this.getSortedTaskExecutors(slotsByTaskExecutor);
        while (pickedSlots.size() < requestedGroups) {
            Set<PhysicalSlot> slotInfos = slotsByTaskExecutor.get(sortedTaskExecutors.next());
            pickedSlots.addAll(slotInfos);
        }
        return pickedSlots;
    }

    private Map<TaskManagerLocation, Set<PhysicalSlot>> getSlotsPerTaskExecutor(Collection<PhysicalSlot> slots) {
        return slots.stream().collect(Collectors.groupingBy(SlotInfo::getTaskManagerLocation, Collectors.mapping(Function.identity(), Collectors.toSet())));
    }
}

