/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public enum TasksBalancedRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy
{
    INSTANCE;

    public static final Logger LOG;

    @Override
    public Collection<RequestSlotMatchingStrategy.RequestSlotMatch> matchRequestsAndSlots(Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests, Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
        if (pendingRequests.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<RequestSlotMatchingStrategy.RequestSlotMatch> resultingMatches = new ArrayList<RequestSlotMatchingStrategy.RequestSlotMatch>();
        List<PendingRequest> sortedRequests = WeightLoadable.sortByLoadingDescend(pendingRequests);
        LOG.debug("Available slots: {}, sortedRequests: {}, taskExecutorsLoad: {}", new Object[]{slots, sortedRequests, taskExecutorsLoad});
        Collection slotElements = slots.stream().map(PhysicalSlotElement::new).collect(Collectors.toList());
        Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> profileSlots = this.getSlotCandidatesByProfile(slotElements, taskExecutorsLoad);
        Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots = this.groupSlotsByTaskExecutor(slotElements);
        for (PendingRequest request : sortedRequests) {
            Optional<PhysicalSlotElement> bestSlotEle = this.tryMatchPhysicalSlot(request, profileSlots, taskExecutorsLoad);
            if (!bestSlotEle.isPresent()) continue;
            PhysicalSlotElement slotElement = bestSlotEle.get();
            this.updateReferenceAfterMatching(profileSlots, taskExecutorsLoad, taskExecutorSlots, slotElement, request.getLoading());
            resultingMatches.add(RequestSlotMatchingStrategy.RequestSlotMatch.createFor(request, slotElement.physicalSlot));
        }
        return resultingMatches;
    }

    private Map<ResourceID, Set<PhysicalSlotElement>> groupSlotsByTaskExecutor(Collection<PhysicalSlotElement> slotElements) {
        return slotElements.stream().collect(Collectors.groupingBy(physicalSlot -> physicalSlot.physicalSlot.getTaskManagerLocation().getResourceID(), Collectors.toSet()));
    }

    private Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> getSlotCandidatesByProfile(Collection<PhysicalSlotElement> slotElements, Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
        HashMap<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> result = new HashMap<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>();
        PhysicalSlotElementPriorityComparator physicalSlotElementPriorityComparator = new PhysicalSlotElementPriorityComparator(taskExecutorsLoad);
        for (PhysicalSlotElement slotEle : slotElements) {
            result.compute(slotEle.physicalSlot.getResourceProfile(), (resourceProfile, oldSlots) -> {
                HeapPriorityQueue<PhysicalSlotElement> values = Objects.isNull(oldSlots) ? new HeapPriorityQueue<PhysicalSlotElement>(physicalSlotElementPriorityComparator, 8) : oldSlots;
                values.add(slotEle);
                return values;
            });
        }
        return result;
    }

    private Optional<PhysicalSlotElement> tryMatchPhysicalSlot(PendingRequest request, Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> profileToSlotMap, Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
        ResourceProfile requestProfile = request.getResourceProfile();
        Set candidateProfiles = profileToSlotMap.keySet().stream().filter(slotProfile -> slotProfile.isMatching(requestProfile)).collect(Collectors.toSet());
        return candidateProfiles.stream().map(candidateProfile -> {
            HeapPriorityQueue slots = (HeapPriorityQueue)profileToSlotMap.get(candidateProfile);
            return Objects.isNull(slots) ? null : (PhysicalSlotElement)slots.peek();
        }).filter(Objects::nonNull).min(new PhysicalSlotElementComparator(taskExecutorsLoad));
    }

    private void updateReferenceAfterMatching(Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> profileSlots, Map<ResourceID, LoadingWeight> taskExecutorsLoad, Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots, PhysicalSlotElement targetSlotElement, LoadingWeight loading) {
        ResourceID tmID = targetSlotElement.physicalSlot.getTaskManagerLocation().getResourceID();
        taskExecutorsLoad.compute(tmID, (ignoredId, oldLoading) -> Objects.isNull(oldLoading) ? loading : oldLoading.merge(loading));
        Set<PhysicalSlotElement> slotToReSort = taskExecutorSlots.remove(tmID);
        for (PhysicalSlotElement slotEle : slotToReSort) {
            HeapPriorityQueue<PhysicalSlotElement> slotsOfProfile = profileSlots.get(slotEle.physicalSlot.getResourceProfile());
            slotsOfProfile.remove(slotEle);
            if (slotEle.equals(targetSlotElement)) continue;
            slotsOfProfile.add(slotEle);
        }
        slotToReSort.remove(targetSlotElement);
        taskExecutorSlots.put(tmID, slotToReSort);
    }

    public String toString() {
        return TasksBalancedRequestSlotMatchingStrategy.class.getSimpleName();
    }

    static {
        LOG = LoggerFactory.getLogger(TasksBalancedRequestSlotMatchingStrategy.class);
    }

    static final class PhysicalSlotElementPriorityComparator
    implements PriorityComparator<PhysicalSlotElement> {
        private final PhysicalSlotElementComparator physicalSlotElementComparator;

        PhysicalSlotElementPriorityComparator(Map<ResourceID, LoadingWeight> taskExecutorsLoading) {
            this.physicalSlotElementComparator = new PhysicalSlotElementComparator(taskExecutorsLoading);
        }

        @Override
        public int comparePriority(PhysicalSlotElement left, PhysicalSlotElement right) {
            return this.physicalSlotElementComparator.compare(left, right);
        }
    }

    static final class PhysicalSlotElement
    extends AbstractHeapPriorityQueueElement {
        private final PhysicalSlot physicalSlot;

        public PhysicalSlotElement(PhysicalSlot physicalSlot) {
            this.physicalSlot = physicalSlot;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o instanceof PhysicalSlotElement) {
                return this.physicalSlot.equals(((PhysicalSlotElement)o).physicalSlot);
            }
            return false;
        }

        public int hashCode() {
            return this.physicalSlot.hashCode();
        }
    }

    static final class PhysicalSlotElementComparator
    implements Comparator<PhysicalSlotElement> {
        private final Map<ResourceID, LoadingWeight> taskExecutorsLoading;

        PhysicalSlotElementComparator(Map<ResourceID, LoadingWeight> taskExecutorsLoading) {
            this.taskExecutorsLoading = (Map)Preconditions.checkNotNull(taskExecutorsLoading);
        }

        @Override
        public int compare(PhysicalSlotElement left, PhysicalSlotElement right) {
            LoadingWeight leftLoad = this.taskExecutorsLoading.getOrDefault(left.physicalSlot.getTaskManagerLocation().getResourceID(), DefaultLoadingWeight.EMPTY);
            LoadingWeight rightLoad = this.taskExecutorsLoading.getOrDefault(right.physicalSlot.getTaskManagerLocation().getResourceID(), DefaultLoadingWeight.EMPTY);
            return leftLoad.compareTo(rightLoad);
        }
    }
}

