/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.scheduler.DefaultSyncPreferredLocationsRetriever;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionSlotAssignment;
import org.apache.flink.runtime.scheduler.SyncPreferredLocationsRetriever;
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class SimpleExecutionSlotAllocator
implements ExecutionSlotAllocator {
    private final PhysicalSlotProvider slotProvider;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever;
    private final SyncPreferredLocationsRetriever preferredLocationsRetriever;
    private final DualKeyLinkedMap<ExecutionAttemptID, SlotRequestId, CompletableFuture<LogicalSlot>> requestedPhysicalSlots;

    SimpleExecutionSlotAllocator(PhysicalSlotProvider slotProvider, Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever, SyncPreferredLocationsRetriever preferredLocationsRetriever, boolean slotWillBeOccupiedIndefinitely) {
        this.slotProvider = (PhysicalSlotProvider)Preconditions.checkNotNull((Object)slotProvider);
        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
        this.resourceProfileRetriever = (Function)Preconditions.checkNotNull(resourceProfileRetriever);
        this.preferredLocationsRetriever = (SyncPreferredLocationsRetriever)Preconditions.checkNotNull((Object)preferredLocationsRetriever);
        this.requestedPhysicalSlots = new DualKeyLinkedMap();
    }

    @Override
    public Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(List<ExecutionAttemptID> executionAttemptIds) {
        HashMap<ExecutionAttemptID, ExecutionSlotAssignment> result = new HashMap<ExecutionAttemptID, ExecutionSlotAssignment>();
        HashMap<SlotRequestId, ExecutionAttemptID> remainingExecutionsToSlotRequest = new HashMap<SlotRequestId, ExecutionAttemptID>(executionAttemptIds.size());
        ArrayList<PhysicalSlotRequest> physicalSlotRequests = new ArrayList<PhysicalSlotRequest>(executionAttemptIds.size());
        for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
            if (this.requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
                result.put(executionAttemptId, new ExecutionSlotAssignment(executionAttemptId, this.requestedPhysicalSlots.getValueByKeyA(executionAttemptId)));
                continue;
            }
            SlotRequestId slotRequestId = new SlotRequestId();
            ResourceProfile resourceProfile = this.resourceProfileRetriever.apply(executionAttemptId);
            Collection<TaskManagerLocation> preferredLocations = this.preferredLocationsRetriever.getPreferredLocations(executionAttemptId.getExecutionVertexId(), Collections.emptySet());
            SlotProfile slotProfile = SlotProfile.priorAllocation(resourceProfile, resourceProfile, preferredLocations, Collections.emptyList(), Collections.emptySet());
            PhysicalSlotRequest request = new PhysicalSlotRequest(slotRequestId, slotProfile, DefaultLoadingWeight.EMPTY, this.slotWillBeOccupiedIndefinitely);
            physicalSlotRequests.add(request);
            remainingExecutionsToSlotRequest.put(slotRequestId, executionAttemptId);
        }
        result.putAll(this.allocatePhysicalSlotsFor(remainingExecutionsToSlotRequest, physicalSlotRequests));
        return result;
    }

    private Map<ExecutionAttemptID, ExecutionSlotAssignment> allocatePhysicalSlotsFor(Map<SlotRequestId, ExecutionAttemptID> executionAttemptIds, List<PhysicalSlotRequest> slotRequests) {
        HashMap<ExecutionAttemptID, ExecutionSlotAssignment> allocatedSlots = new HashMap<ExecutionAttemptID, ExecutionSlotAssignment>();
        Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> slotFutures = this.slotProvider.allocatePhysicalSlots(slotRequests);
        slotFutures.forEach((slotRequestId, slotRequestResultFuture) -> {
            ExecutionAttemptID executionAttemptId = (ExecutionAttemptID)executionAttemptIds.get(slotRequestId);
            CompletionStage slotFuture = slotRequestResultFuture.thenApply(physicalSlotRequest -> this.allocateLogicalSlotFromPhysicalSlot((SlotRequestId)((Object)slotRequestId), physicalSlotRequest.getPhysicalSlot(), this.slotWillBeOccupiedIndefinitely));
            ((CompletableFuture)slotFuture).exceptionally(throwable -> {
                this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
                this.slotProvider.cancelSlotRequest((SlotRequestId)((Object)slotRequestId), (Throwable)throwable);
                return null;
            });
            this.requestedPhysicalSlots.put(executionAttemptId, (SlotRequestId)((Object)slotRequestId), (CompletableFuture<LogicalSlot>)slotFuture);
            allocatedSlots.put(executionAttemptId, new ExecutionSlotAssignment(executionAttemptId, (CompletableFuture<LogicalSlot>)slotFuture));
        });
        return allocatedSlots;
    }

    @Override
    public void cancel(ExecutionAttemptID executionAttemptId) {
        CompletableFuture<LogicalSlot> slotFuture = this.requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
        if (slotFuture != null) {
            slotFuture.cancel(false);
        }
    }

    private void returnLogicalSlot(LogicalSlot slot) {
        this.releaseSlot(slot, new FlinkException("Slot is being returned from SimpleExecutionSlotAllocator."));
    }

    private void releaseSlot(LogicalSlot slot, Throwable cause) {
        this.requestedPhysicalSlots.removeKeyB(slot.getSlotRequestId());
        this.slotProvider.cancelSlotRequest(slot.getSlotRequestId(), cause);
    }

    private LogicalSlot allocateLogicalSlotFromPhysicalSlot(SlotRequestId slotRequestId, PhysicalSlot physicalSlot, boolean slotWillBeOccupiedIndefinitely) {
        SingleLogicalSlot singleLogicalSlot = new SingleLogicalSlot(slotRequestId, physicalSlot, Locality.UNKNOWN, this::returnLogicalSlot, slotWillBeOccupiedIndefinitely);
        LogicalSlotHolder logicalSlotHolder = new LogicalSlotHolder(singleLogicalSlot);
        if (physicalSlot.tryAssignPayload(logicalSlotHolder)) {
            return singleLogicalSlot;
        }
        throw new IllegalStateException("BUG: Unexpected physical slot payload assignment failure!");
    }

    public static class Factory
    implements ExecutionSlotAllocatorFactory {
        private final PhysicalSlotProvider slotProvider;
        private final boolean slotWillBeOccupiedIndefinitely;

        public Factory(PhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) {
            this.slotProvider = slotProvider;
            this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
        }

        @Override
        public ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context) {
            DefaultSyncPreferredLocationsRetriever preferredLocationsRetriever = new DefaultSyncPreferredLocationsRetriever(executionVertexId -> Optional.empty(), context);
            return new SimpleExecutionSlotAllocator(this.slotProvider, id -> context.getResourceProfile(id.getExecutionVertexId()), preferredLocationsRetriever, this.slotWillBeOccupiedIndefinitely);
        }
    }

    private class LogicalSlotHolder
    implements PhysicalSlot.Payload {
        private final SingleLogicalSlot logicalSlot;

        private LogicalSlotHolder(SingleLogicalSlot logicalSlot) {
            this.logicalSlot = (SingleLogicalSlot)Preconditions.checkNotNull((Object)logicalSlot);
        }

        @Override
        public void release(Throwable cause) {
            this.logicalSlot.release(cause);
            SimpleExecutionSlotAllocator.this.releaseSlot(this.logicalSlot, new FlinkException("Physical slot releases its payload."));
        }

        @Override
        public boolean willOccupySlotIndefinitely() {
            return this.logicalSlot.willOccupySlotIndefinitely();
        }
    }
}

