package org.apache.storm.scheduler.resource;

import com.codahale.metrics.Meter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.storm.DaemonConfig;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SingleTopologyCluster;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
import org.apache.storm.scheduler.utils.IConfigLoader;
import org.apache.storm.scheduler.utils.SchedulerConfigCache;
import org.apache.storm.shade.com.google.common.collect.ImmutableList;
import org.apache.storm.utils.DisallowedStrategyException;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/resource/ResourceAwareScheduler.class */
public class ResourceAwareScheduler implements IScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareScheduler.class);
    private Map<String, Object> conf;
    private ISchedulingPriorityStrategy schedulingPriorityStrategy;
    private IConfigLoader configLoader;
    private int maxSchedulingAttempts;
    private int schedulingTimeoutSeconds;
    private ExecutorService backgroundScheduling;
    private Map<String, Set<String>> evictedTopologiesMap;
    private Meter schedulingTimeoutMeter;
    private Meter internalErrorMeter;
    private SchedulerConfigCache<Map<String, Map<String, Double>>> schedulerConfigCache;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/scheduler/resource/ResourceAwareScheduler$TopologySchedulingResources.class */
    public class TopologySchedulingResources {
        boolean remainingResourcesAreSet = false;
        NormalizedResourceOffer clusterAvailableResources;
        NormalizedResourceRequest topologyRequiredResources;
        NormalizedResourceRequest topologyScheduledResources;
        double clusterAvailableMemory;
        double topologyRequiredNonSharedMemory;
        double topologySharedMemoryLowerBound;
        NormalizedResourceOffer remainingRequiredTopologyResources;
        double remainingRequiredTopologyMemory;
        double topologyScheduledMemory;

        TopologySchedulingResources(Cluster cluster, TopologyDetails topologyDetails) {
            this.clusterAvailableResources = cluster.getNonBlacklistedClusterAvailableResources(Collections.emptyList());
            this.clusterAvailableMemory = this.clusterAvailableResources.getTotalMemoryMb();
            this.topologyRequiredResources = topologyDetails.getApproximateTotalResources();
            this.topologyRequiredNonSharedMemory = topologyDetails.getRequestedNonSharedOffHeap() + topologyDetails.getRequestedNonSharedOnHeap();
            this.topologySharedMemoryLowerBound = topologyDetails.getRequestedSharedOffHeap() + topologyDetails.getRequestedSharedOnHeap();
            setScheduledTopologyResources(cluster, topologyDetails);
        }

        void setScheduledTopologyResources(Cluster cluster, TopologyDetails topologyDetails) {
            SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
            if (assignmentById != null) {
                this.topologyScheduledResources = topologyDetails.getApproximateResources(assignmentById.getExecutors());
                this.topologyScheduledMemory = computeScheduledTopologyMemory(cluster, topologyDetails);
            } else {
                this.topologyScheduledResources = new NormalizedResourceRequest();
                this.topologyScheduledMemory = 0.0d;
            }
        }

        boolean canSchedule() {
            return canScheduleAvailable() && canScheduleRemainingRequired();
        }

        boolean canScheduleAvailable() {
            NormalizedResourceOffer normalizedResourceOffer = new NormalizedResourceOffer(this.clusterAvailableResources);
            normalizedResourceOffer.add(this.topologyScheduledResources);
            return !normalizedResourceOffer.remove(this.topologyRequiredResources) && this.clusterAvailableMemory + this.topologyScheduledMemory >= this.topologyRequiredNonSharedMemory + this.topologySharedMemoryLowerBound;
        }

        boolean canScheduleRemainingRequired() {
            if (this.remainingResourcesAreSet) {
                return !this.remainingRequiredTopologyResources.areAnyOverZero() && this.remainingRequiredTopologyMemory <= 0.0d;
            }
            return true;
        }

        void setRemainingRequiredResources(Cluster cluster, TopologyDetails topologyDetails) {
            this.remainingResourcesAreSet = true;
            setScheduledTopologyResources(cluster, topologyDetails);
            this.remainingRequiredTopologyResources = new NormalizedResourceOffer();
            this.remainingRequiredTopologyResources.add(this.topologyRequiredResources);
            this.remainingRequiredTopologyResources.remove(this.topologyScheduledResources);
            this.remainingRequiredTopologyMemory = (this.topologyRequiredNonSharedMemory + this.topologySharedMemoryLowerBound) - this.topologyScheduledMemory;
        }

        void adjustResourcesForEvictedTopology(Cluster cluster, TopologyDetails topologyDetails) {
            SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
            if (assignmentById != null) {
                NormalizedResourceRequest approximateResources = topologyDetails.getApproximateResources(assignmentById.getExecutors());
                double computeScheduledTopologyMemory = computeScheduledTopologyMemory(cluster, topologyDetails);
                this.clusterAvailableResources.add(approximateResources);
                this.clusterAvailableMemory += computeScheduledTopologyMemory;
                this.remainingRequiredTopologyResources.remove(approximateResources);
                this.remainingRequiredTopologyMemory -= computeScheduledTopologyMemory;
            }
        }

        void resetRemaining() {
            this.remainingResourcesAreSet = false;
            this.remainingRequiredTopologyMemory = 0.0d;
        }

        private double getMemoryUsed(SchedulerAssignment schedulerAssignment) {
            return schedulerAssignment.getScheduledResources().values().stream().mapToDouble(workerResources -> {
                return workerResources.get_mem_on_heap() + workerResources.get_mem_off_heap();
            }).sum();
        }

        private double computeScheduledTopologyMemory(Cluster cluster, TopologyDetails topologyDetails) {
            SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
            double d = 0.0d;
            if (assignmentById != null) {
                Iterator<Double> it = assignmentById.getNodeIdToTotalSharedOffHeapNodeMemory().values().iterator();
                while (it.hasNext()) {
                    d += it.next().doubleValue();
                }
                d += getMemoryUsed(assignmentById);
            }
            return d;
        }

        String getRemainingRequiredResourcesMessage() {
            StringBuilder sb = new StringBuilder();
            NormalizedResourceOffer normalizedResourceOffer = new NormalizedResourceOffer();
            normalizedResourceOffer.add(this.clusterAvailableResources);
            normalizedResourceOffer.remove(this.topologyScheduledResources);
            double d = this.remainingRequiredTopologyMemory;
            double totalCpu = this.remainingRequiredTopologyResources.getTotalCpu();
            if (d > 0.0d) {
                sb.append("Additional Memory Required: ").append(d).append(" MB ");
                sb.append("(Available: ").append(normalizedResourceOffer.getTotalMemoryMb()).append(" MB). ");
            }
            if (totalCpu > 0.0d) {
                sb.append("Additional CPU Required: ").append(totalCpu).append("% CPU ");
                sb.append("(Available: ").append(normalizedResourceOffer.getTotalCpu()).append(" % CPU).");
            }
            if (this.remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
                sb.append(" Additional Topology Required Resources: ");
                sb.append(this.remainingRequiredTopologyResources.getNormalizedResources().toString());
                sb.append(" Cluster Available Resources: ");
                sb.append(normalizedResourceOffer.getNormalizedResources().toString());
                sb.append(".  ");
            }
            return sb.toString();
        }
    }

    private static void markFailedTopology(User user, Cluster cluster, TopologyDetails topologyDetails, String str) {
        markFailedTopology(user, cluster, topologyDetails, str, null);
    }

    private static void markFailedTopology(User user, Cluster cluster, TopologyDetails topologyDetails, String str, Throwable th) {
        cluster.setStatus(topologyDetails, str);
        String str2 = topologyDetails.getId() + " " + str;
        if (th != null) {
            LOG.error(str2, th);
        } else {
            LOG.error(str2);
        }
        user.markTopoUnsuccess(topologyDetails);
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void prepare(Map<String, Object> map, StormMetricsRegistry stormMetricsRegistry) {
        this.conf = map;
        this.schedulingTimeoutMeter = stormMetricsRegistry.registerMeter("nimbus:num-scheduling-timeouts");
        this.internalErrorMeter = stormMetricsRegistry.registerMeter("nimbus:scheduler-internal-errors");
        this.schedulingPriorityStrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance((String) map.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
        this.configLoader = ConfigLoaderFactoryService.createConfigLoader(map);
        this.maxSchedulingAttempts = ObjectReader.getInt(map.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS), 5).intValue();
        this.schedulingTimeoutSeconds = ObjectReader.getInt(map.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60).intValue();
        this.backgroundScheduling = Executors.newFixedThreadPool(1);
        this.evictedTopologiesMap = new HashMap();
        this.schedulerConfigCache = new SchedulerConfigCache<>(map, this::loadConfig);
        this.schedulerConfigCache.prepare();
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void cleanup() {
        LOG.info("Cleanup ResourceAwareScheduler scheduler");
        this.backgroundScheduling.shutdown();
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public Map<String, Map<String, Double>> config() {
        return Collections.unmodifiableMap(this.schedulerConfigCache.get());
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void schedule(Topologies topologies, Cluster cluster) {
        this.schedulerConfigCache.refresh();
        Map<String, User> users = getUsers(cluster);
        ArrayList arrayList = new ArrayList(this.schedulingPriorityStrategy.getOrderedTopologies(cluster, users));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ordered list of topologies is: {}", arrayList.stream().map(topologyDetails -> {
                return topologyDetails.getId();
            }).collect(Collectors.toList()));
        }
        HashMap hashMap = new HashMap();
        for (TopologyDetails topologyDetails2 : arrayList) {
            if (cluster.needsSchedulingRas(topologyDetails2)) {
                scheduleTopology(topologyDetails2, cluster, users.get(topologyDetails2.getTopologySubmitter()), arrayList, hashMap);
            } else {
                cluster.setStatusIfAbsent(topologyDetails2.getId(), "Fully Scheduled");
            }
        }
        this.evictedTopologiesMap = hashMap;
    }

    private void scheduleTopology(TopologyDetails topologyDetails, Cluster cluster, User user, List<TopologyDetails> list, Map<String, Set<String>> map) {
        SchedulingResult failure;
        Cluster cluster2 = new Cluster(cluster);
        RasNodes rasNodes = new RasNodes(cluster2);
        String str = (String) topologyDetails.getConf().get(DaemonConfig.VALIDATE_TOPOLOGY_SCHEDULER_STRATEGY);
        try {
            String str2 = (String) topologyDetails.getConf().get(DaemonConfig.VALIDATE_TOPOLOGY_SCHEDULER_STRATEGY);
            if (str2.startsWith("backtype.storm")) {
                str2 = str2.replace("backtype.storm", "org.apache.storm");
                LOG.debug("Replaced backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
            }
            IStrategy iStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance(str2, this.conf);
            iStrategy.prepare(this.conf);
            boolean booleanValue = ((Boolean) topologyDetails.getConf().get("topology.ras.one.executor.per.worker")).booleanValue();
            boolean booleanValue2 = ((Boolean) topologyDetails.getConf().get("topology.ras.one.component.per.worker")).booleanValue();
            if (booleanValue && booleanValue2) {
                LOG.warn("Conflicting options: {} and {} are both set! Ignoring {} option.", new Object[]{"topology.ras.one.executor.per.worker", "topology.ras.one.component.per.worker", "topology.ras.one.component.per.worker"});
            }
            TopologySchedulingResources topologySchedulingResources = new TopologySchedulingResources(cluster2, topologyDetails);
            for (int i = 0; i < this.maxSchedulingAttempts; i++) {
                SingleTopologyCluster singleTopologyCluster = new SingleTopologyCluster(cluster2, topologyDetails.getId());
                try {
                    topologySchedulingResources.resetRemaining();
                    if (topologySchedulingResources.canSchedule()) {
                        Future submit = this.backgroundScheduling.submit(() -> {
                            return iStrategy.schedule(singleTopologyCluster, topologyDetails);
                        });
                        try {
                            failure = (SchedulingResult) submit.get(this.schedulingTimeoutSeconds, TimeUnit.SECONDS);
                        } catch (TimeoutException e) {
                            markFailedTopology(user, cluster, topologyDetails, "Scheduling took too long for " + topologyDetails.getId() + " using strategy " + iStrategy.getClass().getName() + " timeout after " + this.schedulingTimeoutSeconds + " seconds using config " + DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY + ".");
                            this.schedulingTimeoutMeter.mark();
                            submit.cancel(true);
                            return;
                        }
                    } else {
                        failure = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "");
                    }
                    LOG.debug("scheduling result: {}", failure);
                    if (failure == null) {
                        markFailedTopology(user, cluster, topologyDetails, "Internal scheduler error");
                        return;
                    }
                    if (failure.isSuccess()) {
                        cluster.updateFrom(singleTopologyCluster);
                        cluster.setStatus(topologyDetails.getId(), "Running - " + failure.getMessage());
                        return;
                    }
                    if (failure.getStatus() != SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                        user.markTopoUnsuccess(topologyDetails, cluster, failure.toString());
                        return;
                    }
                    LOG.debug("Not enough resources to schedule {}", topologyDetails.getName());
                    ImmutableList reverse = ImmutableList.copyOf(list).reverse();
                    LOG.debug("Attempting to make space for topo {} from user {}", topologyDetails.getName(), topologyDetails.getTopologySubmitter());
                    int indexOf = reverse.indexOf(topologyDetails);
                    topologySchedulingResources.setRemainingRequiredResources(singleTopologyCluster, topologyDetails);
                    HashSet hashSet = new HashSet();
                    for (int i2 = 0; i2 < indexOf; i2++) {
                        TopologyDetails topologyDetails2 = (TopologyDetails) reverse.get(i2);
                        SchedulerAssignment assignmentById = cluster2.getAssignmentById(topologyDetails2.getId());
                        if (assignmentById != null && !assignmentById.getSlots().isEmpty()) {
                            topologySchedulingResources.adjustResourcesForEvictedTopology(singleTopologyCluster, topologyDetails2);
                            hashSet.add(topologyDetails2.getId());
                            rasNodes.freeSlots(cluster2.getUsedSlotsByTopologyId(topologyDetails2.getId()));
                            if (topologySchedulingResources.canSchedule()) {
                                break;
                            }
                        }
                    }
                    if (hashSet.isEmpty()) {
                        markFailedTopology(user, cluster, topologyDetails, "Not enough resources to schedule after evicting lower priority topologies. " + topologySchedulingResources.getRemainingRequiredResourcesMessage() + failure.getErrorMessage());
                        return;
                    }
                    LOG.warn("Evicted Topologies {} when scheduling topology: {}", hashSet, topologyDetails.getId());
                    map.computeIfAbsent(topologyDetails.getId(), str3 -> {
                        return new HashSet();
                    }).addAll(hashSet);
                } catch (Exception e2) {
                    this.internalErrorMeter.mark();
                    markFailedTopology(user, cluster, topologyDetails, "Internal Error - Exception thrown when scheduling. Please check logs for details", e2);
                    return;
                }
            }
            markFailedTopology(user, cluster, topologyDetails, "Failed to make enough resources for " + topologyDetails.getId() + " by evicting lower priority topologies within " + this.maxSchedulingAttempts + " attempts. " + topologySchedulingResources.getRemainingRequiredResourcesMessage());
        } catch (DisallowedStrategyException e3) {
            markFailedTopology(user, cluster, topologyDetails, "Unsuccessful in scheduling - " + e3.getAttemptedClass() + " is not an allowed strategy. Please make sure your " + DaemonConfig.VALIDATE_TOPOLOGY_SCHEDULER_STRATEGY + " config is one of the allowed strategies: " + e3.getAllowedStrategies(), e3);
        } catch (RuntimeException e4) {
            markFailedTopology(user, cluster, topologyDetails, "Unsuccessful in scheduling - failed to create instance of topology strategy " + str + ". Please check logs for details", e4);
        }
    }

    public Map<String, Set<String>> getEvictedTopologiesMap() {
        return Collections.unmodifiableMap(this.evictedTopologiesMap);
    }

    private Map<String, User> getUsers(Cluster cluster) {
        HashMap hashMap = new HashMap();
        Map<String, Map<String, Double>> config = config();
        LOG.debug("userResourcePools: {}", config);
        Iterator<TopologyDetails> it = cluster.getTopologies().iterator();
        while (it.hasNext()) {
            TopologyDetails next = it.next();
            String topologySubmitter = next.getTopologySubmitter();
            if (topologySubmitter == null || topologySubmitter.equals("")) {
                LOG.error("Cannot determine user for topology {}.  Will skip scheduling this topology", next.getName());
            } else if (!hashMap.containsKey(topologySubmitter)) {
                hashMap.put(topologySubmitter, new User(topologySubmitter, config.get(topologySubmitter)));
            }
        }
        return hashMap;
    }

    private Map<String, Map<String, Double>> convertToDouble(Map<String, Map<String, Number>> map) {
        HashMap hashMap = new HashMap();
        if (map != null) {
            for (Map.Entry<String, Map<String, Number>> entry : map.entrySet()) {
                String key = entry.getKey();
                hashMap.put(key, new HashMap());
                for (Map.Entry<String, Number> entry2 : entry.getValue().entrySet()) {
                    ((Map) hashMap.get(key)).put(entry2.getKey(), Double.valueOf(entry2.getValue().doubleValue()));
                }
            }
        }
        return hashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<String, Map<String, Double>> loadConfig() {
        if (this.configLoader != null) {
            Map<?, ?> load = this.configLoader.load(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
            if (load != null) {
                return convertToDouble(load);
            }
            LOG.warn("Config loader returned null. Will try to read from user-resource-pools.yaml");
        }
        Map map = (Map) Utils.findAndReadConfigFile("user-resource-pools.yaml", false).get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
        if (map != null) {
            return convertToDouble(map);
        }
        LOG.warn("Reading from user-resource-pools.yaml returned null. This could because the file is not available. Will load configs from storm configuration");
        return convertToDouble((Map) this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS));
    }
}
