package com.facebook.presto.ttl.nodettlfetchermanagers;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.spi.ttl.NodeInfo;
import com.facebook.presto.spi.ttl.NodeTtl;
import com.facebook.presto.spi.ttl.NodeTtlFetcher;
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PropertiesUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.io.File;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

/* loaded from: input_file:com/facebook/presto/ttl/nodettlfetchermanagers/ConfidenceBasedNodeTtlFetcherManager.class */
public class ConfidenceBasedNodeTtlFetcherManager implements NodeTtlFetcherManager {
    private static final Logger log = Logger.get(ConfidenceBasedNodeTtlFetcherManager.class);
    private static final File TTL_FETCHER_CONFIG = new File("etc/node-ttl-fetcher.properties");
    private static final String TTL_FETCHER_PROPERTY_NAME = "node-ttl-fetcher.factory";
    private final InternalNodeManager nodeManager;
    private final boolean isWorkScheduledOnCoordinator;
    private final NodeTtlFetcherManagerConfig nodeTtlFetcherManagerConfig;
    private PeriodicTaskExecutor periodicTtlRefresher;
    private final AtomicReference<NodeTtlFetcher> ttlFetcher = new AtomicReference<>();
    private final ConcurrentHashMap<InternalNode, NodeTtl> nodeTtlMap = new ConcurrentHashMap<>();
    private final Map<String, NodeTtlFetcherFactory> ttlFetcherFactories = new ConcurrentHashMap();
    private final AtomicLong lastRefreshEpochMillis = new AtomicLong(Long.MAX_VALUE);
    private final CounterStat refreshFailures = new CounterStat();
    private final Consumer<AllNodes> nodeChangeListener = this::refreshTtlInfo;

    @Inject
    public ConfidenceBasedNodeTtlFetcherManager(InternalNodeManager internalNodeManager, NodeSchedulerConfig nodeSchedulerConfig, NodeTtlFetcherManagerConfig nodeTtlFetcherManagerConfig) {
        this.nodeManager = (InternalNodeManager) Objects.requireNonNull(internalNodeManager, "nodeManager is null");
        this.isWorkScheduledOnCoordinator = ((NodeSchedulerConfig) Objects.requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null")).isIncludeCoordinator();
        this.nodeTtlFetcherManagerConfig = (NodeTtlFetcherManagerConfig) Objects.requireNonNull(nodeTtlFetcherManagerConfig, "nodeTtlFetcherManagerConfig is null");
    }

    private static long jitterForPeriodicRefresh(long j) {
        return Math.round(j + (j * 0.1d * ((2.0d * ThreadLocalRandom.current().nextDouble()) - 1.0d)));
    }

    public void scheduleRefresh() {
        this.periodicTtlRefresher = new PeriodicTaskExecutor(this.ttlFetcher.get().getRefreshInterval().toMillis(), this.nodeTtlFetcherManagerConfig.getInitialDelayBeforeRefresh().toMillis(), Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed("refresh-node-ttl-executor-%s")), this::refreshTtlInfo, ConfidenceBasedNodeTtlFetcherManager::jitterForPeriodicRefresh);
        this.periodicTtlRefresher.start();
    }

    @PreDestroy
    public void stop() {
        this.nodeManager.removeNodeChangeListener(this.nodeChangeListener);
        if (this.periodicTtlRefresher != null) {
            this.periodicTtlRefresher.stop();
        }
    }

    @VisibleForTesting
    public synchronized void refreshTtlInfo() {
        refreshTtlInfo(this.nodeManager.getAllNodes());
    }

    private synchronized void refreshTtlInfo(AllNodes allNodes) {
        try {
            Set difference = Sets.difference(allNodes.getActiveNodes(), allNodes.getActiveResourceManagers());
            if (!this.isWorkScheduledOnCoordinator) {
                difference = Sets.difference(difference, allNodes.getActiveCoordinators());
            }
            Map map = (Map) difference.stream().collect(ImmutableMap.toImmutableMap(internalNode -> {
                return new NodeInfo(internalNode.getNodeIdentifier(), internalNode.getHost());
            }, Function.identity()));
            this.nodeTtlMap.putAll((Map) this.ttlFetcher.get().getTtlInfo(ImmutableSet.copyOf(map.keySet())).entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> {
                return (InternalNode) map.get(entry.getKey());
            }, (v0) -> {
                return v0.getValue();
            })));
            this.nodeTtlMap.keySet().removeAll(Sets.difference(this.nodeTtlMap.keySet(), difference).immutableCopy());
            log.info("Node ttls refreshed, nodeTtlMap: %s", new Object[]{this.nodeTtlMap});
            this.lastRefreshEpochMillis.set(System.currentTimeMillis());
        } catch (Throwable th) {
            this.refreshFailures.update(1L);
            log.error(th, "Error loading node ttls");
        }
    }

    @Override // com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager
    public Optional<NodeTtl> getTtlInfo(InternalNode internalNode) {
        return this.nodeTtlMap.containsKey(internalNode) ? Optional.of(this.nodeTtlMap.get(internalNode)) : Optional.empty();
    }

    @Override // com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager
    public Map<InternalNode, NodeTtl> getAllTtls() {
        return ImmutableMap.copyOf(this.nodeTtlMap);
    }

    @Override // com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager
    public void addNodeTtlFetcherFactory(NodeTtlFetcherFactory nodeTtlFetcherFactory) {
        Objects.requireNonNull(nodeTtlFetcherFactory, "nodeTtlFetcherFactory is null");
        if (this.ttlFetcherFactories.putIfAbsent(nodeTtlFetcherFactory.getName(), nodeTtlFetcherFactory) != null) {
            throw new IllegalArgumentException(String.format("Node ttl fetcher factory '%s' is already registered", nodeTtlFetcherFactory.getName()));
        }
    }

    @Override // com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager
    public void loadNodeTtlFetcher() throws Exception {
        String str = "infinite";
        HashMap of = ImmutableMap.of();
        if (TTL_FETCHER_CONFIG.exists()) {
            of = new HashMap(PropertiesUtil.loadProperties(TTL_FETCHER_CONFIG));
            str = of.remove(TTL_FETCHER_PROPERTY_NAME);
            Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "Node ttl fetcher configuration %s does not contain %s", TTL_FETCHER_CONFIG.getAbsoluteFile(), TTL_FETCHER_PROPERTY_NAME);
        }
        load(str, of);
        if (this.ttlFetcher.get().needsPeriodicRefresh()) {
            scheduleRefresh();
        } else {
            refreshTtlInfo();
        }
        this.nodeManager.addNodeChangeListener(this.nodeChangeListener);
    }

    @VisibleForTesting
    public void load(String str, Map<String, String> map) {
        log.info("-- Loading node ttl fetcher factory --");
        NodeTtlFetcherFactory nodeTtlFetcherFactory = this.ttlFetcherFactories.get(str);
        Preconditions.checkState(nodeTtlFetcherFactory != null, "Node ttl fetcher factory %s is not registered", str);
        Preconditions.checkState(this.ttlFetcher.compareAndSet(null, nodeTtlFetcherFactory.create(map)), "Node ttl fetcher has already been set!");
        log.info("-- Loaded node ttl fetcher %s --", new Object[]{str});
    }

    @Managed
    public long getTimeInMillisSinceLastTtlRefresh() {
        if (this.lastRefreshEpochMillis.get() == Long.MAX_VALUE) {
            return -1L;
        }
        return System.currentTimeMillis() - this.lastRefreshEpochMillis.get();
    }

    @Managed
    public long getStaleTtlWorkerCount() {
        Instant minus = Instant.now().minus(this.nodeTtlFetcherManagerConfig.getStaleTtlThreshold().toMillis(), (TemporalUnit) ChronoUnit.MILLIS);
        return this.nodeTtlMap.values().stream().filter(nodeTtl -> {
            return nodeTtl.getTtlPredictionInstant().isBefore(minus);
        }).count();
    }

    @Managed
    @Nested
    public CounterStat getRefreshFailures() {
        return this.refreshFailures;
    }
}
