package software.amazon.kinesis.coordinator.assignment;

import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.coordinator.assignment.LeaseAssignmentManager;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.worker.metricstats.WorkerMetricStats;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/coordinator/assignment/VarianceBasedLeaseAssignmentDecider.class */
public final class VarianceBasedLeaseAssignmentDecider implements LeaseAssignmentDecider {
    private static final Logger log = LoggerFactory.getLogger(VarianceBasedLeaseAssignmentDecider.class);
    private final LeaseAssignmentManager.InMemoryStorageView inMemoryStorageView;
    private final int dampeningPercentageValue;
    private final int reBalanceThreshold;
    private final boolean allowThroughputOvershoot;
    private final Map<String, Double> workerMetricsToFleetLevelAverageMap = new HashMap();
    private final PriorityQueue<WorkerMetricStats> assignableWorkerSortedByAvailableCapacity;
    private int targetLeasePerWorker;

    public VarianceBasedLeaseAssignmentDecider(LeaseAssignmentManager.InMemoryStorageView inMemoryStorageView, int i, int i2, boolean z) {
        this.inMemoryStorageView = inMemoryStorageView;
        this.dampeningPercentageValue = i;
        this.reBalanceThreshold = i2;
        this.allowThroughputOvershoot = z;
        initialize();
        this.assignableWorkerSortedByAvailableCapacity = new PriorityQueue<>(Comparator.comparingDouble(workerMetricStats -> {
            return workerMetricStats.computePercentageToReachAverage(this.workerMetricsToFleetLevelAverageMap);
        }).reversed());
        this.assignableWorkerSortedByAvailableCapacity.addAll(getAvailableWorkersForAssignment(inMemoryStorageView.getActiveWorkerMetrics()));
    }

    private void initialize() {
        this.workerMetricsToFleetLevelAverageMap.putAll((Map) this.inMemoryStorageView.getActiveWorkerMetrics().stream().flatMap(workerMetricStats -> {
            return workerMetricStats.getMetricStats().keySet().stream().map(str -> {
                return new AbstractMap.SimpleEntry(str, Double.valueOf(workerMetricStats.getMetricStat(str)));
            });
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getKey();
        }, HashMap::new, Collectors.averagingDouble((v0) -> {
            return v0.getValue();
        }))));
        this.targetLeasePerWorker = Math.max(this.inMemoryStorageView.getLeaseList().size() / Math.max(this.inMemoryStorageView.getActiveWorkerMetrics().size(), 1), 1);
    }

    private List<WorkerMetricStats> getAvailableWorkersForAssignment(List<WorkerMetricStats> list) {
        return (List) list.stream().filter(workerMetricStats -> {
            return this.inMemoryStorageView.isWorkerTotalThroughputLessThanMaxThroughput(workerMetricStats.getWorkerId()) && this.inMemoryStorageView.isWorkerAssignedLeasesLessThanMaxLeases(workerMetricStats.getWorkerId());
        }).collect(Collectors.toList());
    }

    @Override // software.amazon.kinesis.coordinator.assignment.LeaseAssignmentDecider
    public void assignExpiredOrUnassignedLeases(List<Lease> list) {
        Collections.sort(list, Comparator.comparing((v0) -> {
            return v0.lastCounterIncrementNanos();
        }));
        HashSet hashSet = new HashSet();
        Iterator<Lease> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Lease next = it.next();
            WorkerMetricStats poll = this.assignableWorkerSortedByAvailableCapacity.poll();
            if (!Objects.nonNull(poll)) {
                log.info("No worker available to assign lease {}", next.leaseKey());
                break;
            } else {
                assignLease(next, poll);
                hashSet.add(next);
            }
        }
        list.removeAll(hashSet);
    }

    private List<WorkerMetricStats> getWorkersToTakeLeasesFromIfRequired(List<WorkerMetricStats> list, String str, double d) {
        ArrayList arrayList = new ArrayList();
        double d2 = d * (1.0d + (this.reBalanceThreshold / 100.0d));
        double d3 = d * (1.0d - (this.reBalanceThreshold / 100.0d));
        WorkerMetricStats workerMetricStats = null;
        log.info("Range for re-balance upper threshold {} and lower threshold {}", Double.valueOf(d2), Double.valueOf(d3));
        boolean z = false;
        for (WorkerMetricStats workerMetricStats2 : list) {
            double metricStat = workerMetricStats2.getMetricStat(str);
            boolean isWorkerMetricAboveOperatingRange = workerMetricStats2.isWorkerMetricAboveOperatingRange(str);
            if (metricStat > d2 || metricStat < d3 || isWorkerMetricAboveOperatingRange) {
                z = true;
            }
            if (metricStat >= d2 || isWorkerMetricAboveOperatingRange) {
                arrayList.add(workerMetricStats2);
            }
            if (workerMetricStats == null || workerMetricStats.getMetricStat(str) < metricStat) {
                workerMetricStats = workerMetricStats2;
            }
        }
        if (arrayList.isEmpty()) {
            arrayList.add(workerMetricStats);
        }
        return z ? arrayList : Collections.emptyList();
    }

    @Override // software.amazon.kinesis.coordinator.assignment.LeaseAssignmentDecider
    public void balanceWorkerVariance() {
        List<WorkerMetricStats> activeWorkerMetrics = this.inMemoryStorageView.getActiveWorkerMetrics();
        log.info("WorkerMetricStats to corresponding fleet level average : {}", this.workerMetricsToFleetLevelAverageMap);
        log.info("Active WorkerMetricStats : {}", activeWorkerMetrics);
        HashMap hashMap = new HashMap();
        String str = "";
        double d = -1.0d;
        for (Map.Entry<String, Double> entry : this.workerMetricsToFleetLevelAverageMap.entrySet()) {
            String key = entry.getKey();
            List<WorkerMetricStats> list = (List) activeWorkerMetrics.stream().filter(workerMetricStats -> {
                return workerMetricStats.containsMetricStat(key);
            }).collect(Collectors.toList());
            double doubleValue = entry.getValue().doubleValue();
            List<WorkerMetricStats> workersToTakeLeasesFromIfRequired = getWorkersToTakeLeasesFromIfRequired(list, key, doubleValue);
            HashMap hashMap2 = new HashMap();
            double d2 = 0.0d;
            for (WorkerMetricStats workerMetricStats2 : workersToTakeLeasesFromIfRequired) {
                double metricStat = workerMetricStats2.getMetricStat(key);
                double doubleValue2 = this.inMemoryStorageView.getTotalAssignedThroughput(workerMetricStats2.getWorkerId()).doubleValue() * ((metricStat - doubleValue) / metricStat) * (this.dampeningPercentageValue / 100.0d);
                log.info("For worker : {} taking throughput : {} after dampening based on WorkerMetricStats : {}", new Object[]{workerMetricStats2.getWorkerId(), Double.valueOf(doubleValue2), key});
                d2 += doubleValue2;
                hashMap2.put(workerMetricStats2.getWorkerId(), Double.valueOf(doubleValue2));
            }
            if (d < d2) {
                str = key;
                hashMap.clear();
                hashMap.putAll(hashMap2);
                d = d2;
            }
        }
        log.info("Largest outlier WorkerMetricStats is : {} and total of {} throughput will be rebalanced", str, Double.valueOf(d));
        log.info("Workers to throughput taken from them is : {}", hashMap);
        ArrayList<Map.Entry> arrayList = new ArrayList(hashMap.entrySet());
        Collections.sort(arrayList, (entry2, entry3) -> {
            return ((Double) entry3.getValue()).compareTo((Double) entry2.getValue());
        });
        for (Map.Entry entry4 : arrayList) {
            String str2 = (String) entry4.getKey();
            Queue<Lease> leasesToTake = getLeasesToTake(str2, ((Double) entry4.getValue()).doubleValue());
            log.info("Leases taken from worker : {} are : {}", str2, leasesToTake.stream().map((v0) -> {
                return v0.leaseKey();
            }).collect(Collectors.toSet()));
            Iterator<Lease> it = leasesToTake.iterator();
            while (true) {
                if (it.hasNext()) {
                    Lease next = it.next();
                    WorkerMetricStats poll = this.assignableWorkerSortedByAvailableCapacity.poll();
                    if (Objects.nonNull(poll) && poll.willAnyMetricStatsGoAboveAverageUtilizationOrOperatingRange(this.workerMetricsToFleetLevelAverageMap, this.inMemoryStorageView.getTargetAverageThroughput(), next.throughputKBps().doubleValue(), this.targetLeasePerWorker)) {
                        log.info("No worker to assign anymore in this iteration due to hitting average values");
                        break;
                    } else if (Objects.nonNull(poll)) {
                        assignLease(next, poll);
                    }
                }
            }
        }
        printWorkerToUtilizationLog(this.inMemoryStorageView.getActiveWorkerMetrics());
    }

    private Queue<Lease> getLeasesToTake(String str, double d) {
        Set<Lease> set = this.inMemoryStorageView.getWorkerToLeasesMap().get(str);
        return (Objects.isNull(set) || set.isEmpty()) ? new ArrayDeque() : this.inMemoryStorageView.getTotalAssignedThroughput(str).doubleValue() == 0.0d ? new ArrayDeque(new ArrayList(set).subList(0, 1)) : getLeasesCombiningToThroughput(str, d);
    }

    private void assignLease(Lease lease, WorkerMetricStats workerMetricStats) {
        if (Objects.nonNull(lease.actualOwner()) && lease.actualOwner().equals(workerMetricStats.getWorkerId())) {
            this.assignableWorkerSortedByAvailableCapacity.add(workerMetricStats);
            return;
        }
        workerMetricStats.extrapolateMetricStatValuesForAddedThroughput(this.workerMetricsToFleetLevelAverageMap, this.inMemoryStorageView.getTargetAverageThroughput(), lease.throughputKBps().doubleValue(), this.targetLeasePerWorker);
        log.info("Assigning lease : {} to worker : {}", lease.leaseKey(), workerMetricStats.getWorkerId());
        this.inMemoryStorageView.performLeaseAssignment(lease, workerMetricStats.getWorkerId());
        if (this.inMemoryStorageView.isWorkerTotalThroughputLessThanMaxThroughput(workerMetricStats.getWorkerId()) && this.inMemoryStorageView.isWorkerAssignedLeasesLessThanMaxLeases(workerMetricStats.getWorkerId())) {
            this.assignableWorkerSortedByAvailableCapacity.add(workerMetricStats);
        }
    }

    private void printWorkerToUtilizationLog(List<WorkerMetricStats> list) {
        list.forEach(workerMetricStats -> {
            log.info("WorkerId : {} and average WorkerMetricStats data : {}", workerMetricStats.getWorkerId(), workerMetricStats.getMetricStatsMap());
        });
    }

    private Queue<Lease> getLeasesCombiningToThroughput(String str, double d) {
        ArrayList<Lease> arrayList = new ArrayList(this.inMemoryStorageView.getWorkerToLeasesMap().get(str));
        if (arrayList.isEmpty()) {
            return new ArrayDeque();
        }
        Collections.shuffle(arrayList);
        ArrayDeque arrayDeque = new ArrayDeque();
        double d2 = d;
        for (Lease lease : arrayList) {
            if (d2 - lease.throughputKBps().doubleValue() > 0.0d) {
                d2 -= lease.throughputKBps().doubleValue();
                arrayDeque.add(lease);
            }
        }
        if (this.allowThroughputOvershoot && arrayDeque.isEmpty()) {
            Optional min = arrayList.stream().min(Comparator.comparingDouble((v0) -> {
                return v0.throughputKBps();
            }));
            arrayDeque.getClass();
            min.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        return arrayDeque;
    }
}
