package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.stats.TDigest;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.execution.scheduler.PartitionMemoryEstimator;
import io.trino.memory.ClusterMemoryManager;
import io.trino.memory.MemoryInfo;
import io.trino.memory.MemoryManagerConfig;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.ErrorCode;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.memory.MemoryPoolInfo;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.util.VisibleForTesting;

@ThreadSafe
/* loaded from: input_file:io/trino/execution/scheduler/BinPackingNodeAllocatorService.class */
public class BinPackingNodeAllocatorService implements NodeAllocatorService, NodeAllocator, PartitionMemoryEstimatorFactory {
    private static final Logger log = Logger.get(BinPackingNodeAllocatorService.class);

    @VisibleForTesting
    static final int PROCESS_PENDING_ACQUIRES_DELAY_SECONDS = 5;
    private final InternalNodeManager nodeManager;
    private final Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier;
    private final ScheduledThreadPoolExecutor executor;
    private final AtomicBoolean started;
    private final AtomicBoolean stopped;
    private final Semaphore processSemaphore;
    private final AtomicReference<Map<String, MemoryPoolInfo>> nodePoolMemoryInfos;
    private final AtomicReference<Optional<DataSize>> maxNodePoolSize;
    private final boolean scheduleOnCoordinator;
    private final boolean memoryRequirementIncreaseOnWorkerCrashEnabled;
    private final DataSize taskRuntimeMemoryEstimationOverhead;
    private final Ticker ticker;
    private final Deque<PendingAcquire> pendingAcquires;
    private final Set<BinPackingNodeLease> fulfilledAcquires;
    private final Duration allowedNoMatchingNodePeriod;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/BinPackingNodeAllocatorService$BinPackingNodeLease.class */
    public class BinPackingNodeLease implements NodeAllocator.NodeLease {
        private final long memoryLease;
        private final AtomicBoolean speculative;
        private final SettableFuture<InternalNode> node = SettableFuture.create();
        private final AtomicBoolean released = new AtomicBoolean();
        private final AtomicReference<TaskId> taskId = new AtomicReference<>();

        private BinPackingNodeLease(long j, boolean z) {
            this.memoryLease = j;
            this.speculative = new AtomicBoolean(z);
        }

        @Override // io.trino.execution.scheduler.NodeAllocator.NodeLease
        public ListenableFuture<InternalNode> getNode() {
            return this.node;
        }

        InternalNode getAssignedNode() {
            try {
                return (InternalNode) Futures.getDone(this.node);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        SettableFuture<InternalNode> getNodeSettableFuture() {
            return this.node;
        }

        @Override // io.trino.execution.scheduler.NodeAllocator.NodeLease
        public void attachTaskId(TaskId taskId) {
            if (!this.taskId.compareAndSet(null, taskId)) {
                throw new IllegalStateException("cannot attach taskId " + taskId + "; already attached to " + this.taskId.get());
            }
        }

        @Override // io.trino.execution.scheduler.NodeAllocator.NodeLease
        public void setSpeculative(boolean z) {
            Preconditions.checkArgument(!z, "cannot make non-speculative task speculative");
            if (this.speculative.compareAndSet(true, false)) {
                BinPackingNodeAllocatorService.this.wakeupProcessPendingAcquires();
            }
        }

        public boolean isSpeculative() {
            return this.speculative.get();
        }

        public Optional<TaskId> getAttachedTaskId() {
            return Optional.ofNullable(this.taskId.get());
        }

        public long getMemoryLease() {
            return this.memoryLease;
        }

        @Override // io.trino.execution.scheduler.NodeAllocator.NodeLease
        public void release() {
            if (!this.released.compareAndSet(false, true)) {
                throw new IllegalStateException("Node " + this.node + " already released");
            }
            this.node.cancel(true);
            if (!this.node.isDone() || this.node.isCancelled()) {
                return;
            }
            Preconditions.checkState(BinPackingNodeAllocatorService.this.fulfilledAcquires.remove(this), "node lease %s not found in fulfilledAcquires %s", this, BinPackingNodeAllocatorService.this.fulfilledAcquires);
            BinPackingNodeAllocatorService.this.wakeupProcessPendingAcquires();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/BinPackingNodeAllocatorService$BinPackingSimulation.class */
    public static class BinPackingSimulation {
        private final InternalNodeManager.NodesSnapshot nodesSnapshot;
        private final List<InternalNode> allNodesSorted;
        private final boolean ignoreAcquiredSpeculative;
        private final Map<String, Long> nodesRemainingMemory;
        private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;
        private final Map<String, Long> speculativeMemoryReserved;
        private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
        private final boolean scheduleOnCoordinator;

        /* loaded from: input_file:io/trino/execution/scheduler/BinPackingNodeAllocatorService$BinPackingSimulation$ReservationStatus.class */
        public enum ReservationStatus {
            NONE_MATCHING,
            NOT_ENOUGH_RESOURCES_NOW,
            RESERVED
        }

        /* loaded from: input_file:io/trino/execution/scheduler/BinPackingNodeAllocatorService$BinPackingSimulation$ReserveResult.class */
        public static class ReserveResult {
            public static final ReserveResult NONE_MATCHING = new ReserveResult(ReservationStatus.NONE_MATCHING, Optional.empty());
            public static final ReserveResult NOT_ENOUGH_RESOURCES_NOW = new ReserveResult(ReservationStatus.NOT_ENOUGH_RESOURCES_NOW, Optional.empty());
            private final ReservationStatus status;
            private final Optional<InternalNode> node;

            public static ReserveResult reserved(InternalNode internalNode) {
                return new ReserveResult(ReservationStatus.RESERVED, Optional.of(internalNode));
            }

            private ReserveResult(ReservationStatus reservationStatus, Optional<InternalNode> optional) {
                this.status = (ReservationStatus) Objects.requireNonNull(reservationStatus, "status is null");
                this.node = (Optional) Objects.requireNonNull(optional, "node is null");
                Preconditions.checkArgument(optional.isPresent() == (reservationStatus == ReservationStatus.RESERVED), "node must be set iff status is RESERVED");
            }

            public ReservationStatus getStatus() {
                return this.status;
            }

            public Optional<InternalNode> getNode() {
                return this.node;
            }
        }

        public BinPackingSimulation(InternalNodeManager.NodesSnapshot nodesSnapshot, Map<String, MemoryPoolInfo> map, Set<BinPackingNodeLease> set, boolean z, DataSize dataSize, boolean z2) {
            this.nodesSnapshot = (InternalNodeManager.NodesSnapshot) Objects.requireNonNull(nodesSnapshot, "nodesSnapshot is null");
            this.allNodesSorted = (List) nodesSnapshot.getAllNodes().stream().sorted(Comparator.comparing((v0) -> {
                return v0.getNodeIdentifier();
            })).collect(ImmutableList.toImmutableList());
            this.ignoreAcquiredSpeculative = z2;
            Objects.requireNonNull(map, "nodeMemoryPoolInfos is null");
            this.nodeMemoryPoolInfos = ImmutableMap.copyOf(map);
            this.scheduleOnCoordinator = z;
            HashMap hashMap = new HashMap();
            for (InternalNode internalNode : nodesSnapshot.getAllNodes()) {
                MemoryPoolInfo memoryPoolInfo = map.get(internalNode.getNodeIdentifier());
                if (memoryPoolInfo == null) {
                    hashMap.put(internalNode.getNodeIdentifier(), ImmutableMap.of());
                } else {
                    hashMap.put(internalNode.getNodeIdentifier(), memoryPoolInfo.getTaskMemoryReservations());
                }
            }
            HashMap hashMap2 = new HashMap();
            HashMultimap create = HashMultimap.create();
            for (BinPackingNodeLease binPackingNodeLease : set) {
                if (!z2 || !binPackingNodeLease.isSpeculative()) {
                    InternalNode assignedNode = binPackingNodeLease.getAssignedNode();
                    create.put(assignedNode.getNodeIdentifier(), binPackingNodeLease);
                    hashMap2.compute(assignedNode.getNodeIdentifier(), (str, l) -> {
                        return Long.valueOf((l == null ? 0L : l.longValue()) + binPackingNodeLease.getMemoryLease());
                    });
                }
            }
            this.speculativeMemoryReserved = new HashMap();
            if (z2) {
                for (BinPackingNodeLease binPackingNodeLease2 : set) {
                    if (binPackingNodeLease2.isSpeculative()) {
                        this.speculativeMemoryReserved.compute(binPackingNodeLease2.getAssignedNode().getNodeIdentifier(), (str2, l2) -> {
                            return Long.valueOf((l2 == null ? 0L : l2.longValue()) + binPackingNodeLease2.getMemoryLease());
                        });
                    }
                }
            }
            this.nodesRemainingMemory = new HashMap();
            for (InternalNode internalNode2 : nodesSnapshot.getAllNodes()) {
                MemoryPoolInfo memoryPoolInfo2 = map.get(internalNode2.getNodeIdentifier());
                if (memoryPoolInfo2 == null) {
                    this.nodesRemainingMemory.put(internalNode2.getNodeIdentifier(), 0L);
                } else {
                    this.nodesRemainingMemory.put(internalNode2.getNodeIdentifier(), Long.valueOf(memoryPoolInfo2.getMaxBytes() - ((Long) hashMap2.getOrDefault(internalNode2.getNodeIdentifier(), 0L)).longValue()));
                }
            }
            this.nodesRemainingMemoryRuntimeAdjusted = new HashMap();
            for (InternalNode internalNode3 : nodesSnapshot.getAllNodes()) {
                MemoryPoolInfo memoryPoolInfo3 = map.get(internalNode3.getNodeIdentifier());
                if (memoryPoolInfo3 == null) {
                    this.nodesRemainingMemoryRuntimeAdjusted.put(internalNode3.getNodeIdentifier(), 0L);
                } else {
                    Map map2 = (Map) hashMap.get(internalNode3.getNodeIdentifier());
                    long j = 0;
                    for (BinPackingNodeLease binPackingNodeLease3 : create.get(internalNode3.getNodeIdentifier())) {
                        long j2 = 0;
                        if (binPackingNodeLease3.getAttachedTaskId().isPresent()) {
                            j2 = ((Long) map2.getOrDefault(binPackingNodeLease3.getAttachedTaskId().get().toString(), 0L)).longValue() + dataSize.toBytes();
                        }
                        j += Math.max(j2, binPackingNodeLease3.getMemoryLease());
                    }
                    this.nodesRemainingMemoryRuntimeAdjusted.put(internalNode3.getNodeIdentifier(), Long.valueOf(memoryPoolInfo3.getMaxBytes() - Math.max(j, memoryPoolInfo3.getReservedBytes())));
                }
            }
        }

        public ReserveResult tryReserve(PendingAcquire pendingAcquire) {
            NodeRequirements nodeRequirements = pendingAcquire.getNodeRequirements();
            Optional<CatalogHandle> catalogHandle = nodeRequirements.getCatalogHandle();
            InternalNodeManager.NodesSnapshot nodesSnapshot = this.nodesSnapshot;
            Objects.requireNonNull(nodesSnapshot);
            Optional<U> map = catalogHandle.map(nodesSnapshot::getConnectorNodes);
            List list = (List) this.allNodesSorted.stream().filter(internalNode -> {
                return map.isEmpty() || ((Set) map.get()).contains(internalNode);
            }).filter(internalNode2 -> {
                if (nodeRequirements.getAddresses().contains(internalNode2.getHostAndPort())) {
                    return true;
                }
                if (nodeRequirements.getAddresses().isEmpty()) {
                    return this.scheduleOnCoordinator || !internalNode2.isCoordinator();
                }
                return false;
            }).collect(ImmutableList.toImmutableList());
            if (list.isEmpty()) {
                return ReserveResult.NONE_MATCHING;
            }
            Comparator<InternalNode> comparing = Comparator.comparing(internalNode3 -> {
                return this.nodesRemainingMemoryRuntimeAdjusted.get(internalNode3.getNodeIdentifier());
            });
            if (this.ignoreAcquiredSpeculative) {
                comparing = resolveTiesWithSpeculativeMemory(comparing);
            }
            InternalNode internalNode4 = (InternalNode) list.stream().max(comparing).orElseThrow();
            if (this.nodesRemainingMemoryRuntimeAdjusted.get(internalNode4.getNodeIdentifier()).longValue() >= pendingAcquire.getMemoryLease() || isNodeEmpty(internalNode4.getNodeIdentifier())) {
                subtractFromRemainingMemory(internalNode4.getNodeIdentifier(), pendingAcquire.getMemoryLease());
                return ReserveResult.reserved(internalNode4);
            }
            Comparator<InternalNode> comparing2 = Comparator.comparing(internalNode5 -> {
                return this.nodesRemainingMemory.get(internalNode5.getNodeIdentifier());
            });
            if (this.ignoreAcquiredSpeculative) {
                comparing2 = resolveTiesWithSpeculativeMemory(comparing2);
            }
            subtractFromRemainingMemory(((InternalNode) list.stream().max(comparing2).orElseThrow()).getNodeIdentifier(), pendingAcquire.getMemoryLease());
            return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
        }

        private Comparator<InternalNode> resolveTiesWithSpeculativeMemory(Comparator<InternalNode> comparator) {
            return comparator.thenComparing(internalNode -> {
                return Long.valueOf(-this.speculativeMemoryReserved.getOrDefault(internalNode.getNodeIdentifier(), 0L).longValue());
            });
        }

        private void subtractFromRemainingMemory(String str, long j) {
            this.nodesRemainingMemoryRuntimeAdjusted.compute(str, (str2, l) -> {
                return Long.valueOf(l.longValue() - j);
            });
            this.nodesRemainingMemory.compute(str, (str3, l2) -> {
                return Long.valueOf(l2.longValue() - j);
            });
        }

        private boolean isNodeEmpty(String str) {
            return this.nodeMemoryPoolInfos.containsKey(str) && this.nodesRemainingMemory.get(str).equals(Long.valueOf(this.nodeMemoryPoolInfos.get(str).getMaxBytes()));
        }
    }

    /* loaded from: input_file:io/trino/execution/scheduler/BinPackingNodeAllocatorService$ExponentialGrowthPartitionMemoryEstimator.class */
    private class ExponentialGrowthPartitionMemoryEstimator implements PartitionMemoryEstimator {
        private final TDigest memoryUsageDistribution = new TDigest();

        private ExponentialGrowthPartitionMemoryEstimator() {
        }

        @Override // io.trino.execution.scheduler.PartitionMemoryEstimator
        public PartitionMemoryEstimator.MemoryRequirements getInitialMemoryRequirements(Session session, DataSize dataSize) {
            return new PartitionMemoryEstimator.MemoryRequirements(capMemoryToMaxNodeSize((DataSize) Ordering.natural().max(dataSize, getEstimatedMemoryUsage(session))));
        }

        @Override // io.trino.execution.scheduler.PartitionMemoryEstimator
        public PartitionMemoryEstimator.MemoryRequirements getNextRetryMemoryRequirements(Session session, PartitionMemoryEstimator.MemoryRequirements memoryRequirements, DataSize dataSize, ErrorCode errorCode) {
            DataSize dataSize2 = (DataSize) Ordering.natural().max(dataSize, memoryRequirements.getRequiredMemory());
            if (BinPackingNodeAllocatorService.this.shouldIncreaseMemoryRequirement(errorCode)) {
                dataSize2 = DataSize.of((long) (dataSize2.toBytes() * SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor(session)), DataSize.Unit.BYTE);
            }
            return new PartitionMemoryEstimator.MemoryRequirements(capMemoryToMaxNodeSize((DataSize) Ordering.natural().max(dataSize2, getEstimatedMemoryUsage(session))));
        }

        private DataSize capMemoryToMaxNodeSize(DataSize dataSize) {
            Optional<DataSize> optional = BinPackingNodeAllocatorService.this.maxNodePoolSize.get();
            return optional.isEmpty() ? dataSize : (DataSize) Ordering.natural().min(dataSize, optional.get());
        }

        @Override // io.trino.execution.scheduler.PartitionMemoryEstimator
        public synchronized void registerPartitionFinished(Session session, PartitionMemoryEstimator.MemoryRequirements memoryRequirements, DataSize dataSize, boolean z, Optional<ErrorCode> optional) {
            if (z) {
                this.memoryUsageDistribution.add(dataSize.toBytes());
            }
            if (!z && optional.isPresent() && BinPackingNodeAllocatorService.this.shouldIncreaseMemoryRequirement(optional.get())) {
                this.memoryUsageDistribution.add(Math.max(memoryRequirements.getRequiredMemory().toBytes(), dataSize.toBytes()) * SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor(session));
            }
        }

        private synchronized DataSize getEstimatedMemoryUsage(Session session) {
            double valueAt = this.memoryUsageDistribution.valueAt(SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile(session));
            return Double.isNaN(valueAt) ? DataSize.ofBytes(0L) : DataSize.ofBytes((long) valueAt);
        }

        private String memoryUsageDistributionInfo() {
            double[] valuesAt;
            double[] dArr = {0.01d, 0.05d, 0.1d, 0.2d, 0.5d, 0.8d, 0.9d, 0.95d, 0.99d};
            synchronized (this) {
                valuesAt = this.memoryUsageDistribution.valuesAt(dArr);
            }
            return (String) IntStream.range(0, dArr.length).mapToObj(i -> {
                double d = dArr[i];
                double d2 = valuesAt[i];
                return d + "=" + d;
            }).collect(Collectors.joining(", ", "[", "]"));
        }

        public String toString() {
            return "memoryUsageDistribution=" + memoryUsageDistributionInfo();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/BinPackingNodeAllocatorService$PendingAcquire.class */
    public static class PendingAcquire {
        private final NodeRequirements nodeRequirements;
        private final DataSize memoryRequirement;
        private final BinPackingNodeLease lease;
        private final Stopwatch noMatchingNodeStopwatch;

        private PendingAcquire(NodeRequirements nodeRequirements, DataSize dataSize, BinPackingNodeLease binPackingNodeLease, Ticker ticker) {
            this.nodeRequirements = (NodeRequirements) Objects.requireNonNull(nodeRequirements, "nodeRequirements is null");
            this.memoryRequirement = (DataSize) Objects.requireNonNull(dataSize, "memoryRequirement is null");
            this.lease = (BinPackingNodeLease) Objects.requireNonNull(binPackingNodeLease, "lease is null");
            this.noMatchingNodeStopwatch = Stopwatch.createUnstarted(ticker);
        }

        public NodeRequirements getNodeRequirements() {
            return this.nodeRequirements;
        }

        public BinPackingNodeLease getLease() {
            return this.lease;
        }

        public SettableFuture<InternalNode> getFuture() {
            return this.lease.getNodeSettableFuture();
        }

        public long getMemoryLease() {
            return this.memoryRequirement.toBytes();
        }

        public Duration markNoMatchingNodeFound() {
            if (!this.noMatchingNodeStopwatch.isRunning()) {
                this.noMatchingNodeStopwatch.start();
            }
            return this.noMatchingNodeStopwatch.elapsed();
        }

        public void resetNoMatchingNodeFound() {
            this.noMatchingNodeStopwatch.reset();
        }

        public boolean isSpeculative() {
            return this.lease.isSpeculative();
        }
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    @Inject
    public BinPackingNodeAllocatorService(InternalNodeManager internalNodeManager, ClusterMemoryManager clusterMemoryManager, NodeSchedulerConfig nodeSchedulerConfig, MemoryManagerConfig memoryManagerConfig) {
        this(internalNodeManager, clusterMemoryManager::getWorkerMemoryInfo, nodeSchedulerConfig.isIncludeCoordinator(), memoryManagerConfig.isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(), Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()), memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(), Ticker.systemTicker());
        Objects.requireNonNull(clusterMemoryManager);
    }

    @VisibleForTesting
    BinPackingNodeAllocatorService(InternalNodeManager internalNodeManager, Supplier<Map<String, Optional<MemoryInfo>>> supplier, boolean z, boolean z2, Duration duration, DataSize dataSize, Ticker ticker) {
        this.executor = new ScheduledThreadPoolExecutor(2, Threads.daemonThreadsNamed("bin-packing-node-allocator"));
        this.started = new AtomicBoolean();
        this.stopped = new AtomicBoolean();
        this.processSemaphore = new Semaphore(0);
        this.nodePoolMemoryInfos = new AtomicReference<>(ImmutableMap.of());
        this.maxNodePoolSize = new AtomicReference<>(Optional.empty());
        this.pendingAcquires = new ConcurrentLinkedDeque();
        this.fulfilledAcquires = Sets.newConcurrentHashSet();
        this.nodeManager = (InternalNodeManager) Objects.requireNonNull(internalNodeManager, "nodeManager is null");
        this.workerMemoryInfoSupplier = (Supplier) Objects.requireNonNull(supplier, "workerMemoryInfoSupplier is null");
        this.scheduleOnCoordinator = z;
        this.memoryRequirementIncreaseOnWorkerCrashEnabled = z2;
        this.allowedNoMatchingNodePeriod = (Duration) Objects.requireNonNull(duration, "allowedNoMatchingNodePeriod is null");
        this.taskRuntimeMemoryEstimationOverhead = (DataSize) Objects.requireNonNull(dataSize, "taskRuntimeMemoryEstimationOverhead is null");
        this.ticker = (Ticker) Objects.requireNonNull(ticker, "ticker is null");
    }

    @PostConstruct
    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.executor.schedule(() -> {
                while (!this.stopped.get()) {
                    try {
                        this.processSemaphore.tryAcquire(5L, TimeUnit.SECONDS);
                        this.processSemaphore.drainPermits();
                        processPendingAcquires();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (Exception e2) {
                        log.warn(e2, "Error updating nodes");
                    }
                }
            }, 0L, TimeUnit.SECONDS);
        }
        refreshNodePoolMemoryInfos();
        this.executor.scheduleWithFixedDelay(() -> {
            try {
                refreshNodePoolMemoryInfos();
            } catch (Throwable th) {
                log.error(th, "Unexpected error while refreshing node pool memory infos");
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void stop() {
        this.stopped.set(true);
        this.executor.shutdownNow();
    }

    @VisibleForTesting
    void refreshNodePoolMemoryInfos() {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        long j = -1;
        for (Map.Entry<String, Optional<MemoryInfo>> entry : this.workerMemoryInfoSupplier.get().entrySet()) {
            if (!entry.getValue().isEmpty()) {
                MemoryPoolInfo pool = entry.getValue().get().getPool();
                builder.put(entry.getKey(), pool);
                j = Math.max(pool.getMaxBytes(), j);
            }
        }
        this.maxNodePoolSize.set(j == -1 ? Optional.empty() : Optional.of(DataSize.ofBytes(j)));
        this.nodePoolMemoryInfos.set(builder.buildOrThrow());
    }

    @VisibleForTesting
    synchronized void processPendingAcquires() {
        processPendingAcquires(false);
        if (this.pendingAcquires.stream().anyMatch(pendingAcquire -> {
            return !pendingAcquire.isSpeculative();
        })) {
            return;
        }
        processPendingAcquires(true);
    }

    private void processPendingAcquires(boolean z) {
        Iterator<PendingAcquire> it = this.pendingAcquires.iterator();
        BinPackingSimulation binPackingSimulation = new BinPackingSimulation(this.nodeManager.getActiveNodesSnapshot(), this.nodePoolMemoryInfos.get(), this.fulfilledAcquires, this.scheduleOnCoordinator, this.taskRuntimeMemoryEstimationOverhead, !z);
        while (it.hasNext()) {
            PendingAcquire next = it.next();
            if (next.getFuture().isCancelled()) {
                it.remove();
            } else if (next.isSpeculative() != z) {
                continue;
            } else {
                BinPackingSimulation.ReserveResult tryReserve = binPackingSimulation.tryReserve(next);
                switch (tryReserve.getStatus()) {
                    case RESERVED:
                        InternalNode orElseThrow = tryReserve.getNode().orElseThrow();
                        this.fulfilledAcquires.add(next.getLease());
                        next.getFuture().set(orElseThrow);
                        if (next.getFuture().isCancelled()) {
                            this.fulfilledAcquires.remove(next.getLease());
                            wakeupProcessPendingAcquires();
                        }
                        it.remove();
                        break;
                    case NONE_MATCHING:
                        if (next.markNoMatchingNodeFound().compareTo(this.allowedNoMatchingNodePeriod) > 0) {
                            next.getFuture().setException(new TrinoException(StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query"));
                            it.remove();
                            break;
                        } else {
                            break;
                        }
                    case NOT_ENOUGH_RESOURCES_NOW:
                        next.resetNoMatchingNodeFound();
                        break;
                    default:
                        throw new IllegalArgumentException("unknown status: " + tryReserve.getStatus());
                }
            }
        }
    }

    private void wakeupProcessPendingAcquires() {
        this.processSemaphore.release();
    }

    @Override // io.trino.execution.scheduler.NodeAllocatorService
    public NodeAllocator getNodeAllocator(Session session) {
        return this;
    }

    @Override // io.trino.execution.scheduler.NodeAllocator
    public NodeAllocator.NodeLease acquire(NodeRequirements nodeRequirements, DataSize dataSize, boolean z) {
        BinPackingNodeLease binPackingNodeLease = new BinPackingNodeLease(dataSize.toBytes(), z);
        this.pendingAcquires.add(new PendingAcquire(nodeRequirements, dataSize, binPackingNodeLease, this.ticker));
        wakeupProcessPendingAcquires();
        return binPackingNodeLease;
    }

    @Override // io.trino.execution.scheduler.NodeAllocator, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // io.trino.execution.scheduler.PartitionMemoryEstimatorFactory
    public PartitionMemoryEstimator createPartitionMemoryEstimator() {
        return new ExponentialGrowthPartitionMemoryEstimator();
    }

    private boolean shouldIncreaseMemoryRequirement(ErrorCode errorCode) {
        return ErrorCodes.isOutOfMemoryError(errorCode) || (this.memoryRequirementIncreaseOnWorkerCrashEnabled && ErrorCodes.isWorkerCrashAssociatedError(errorCode));
    }
}
