package com.yahoo.search.cluster;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.yolean.UncheckedInterruptedException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/yahoo/search/cluster/ClusterMonitor.class */
public class ClusterMonitor<T> {
    private static final Logger log = Logger.getLogger(ClusterMonitor.class.getName());
    private final NodeManager<T> nodeManager;
    private final ClusterMonitor<T>.MonitorThread monitorThread;
    private final MonitorConfiguration configuration = new MonitorConfiguration();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap());
    private final Set<T> nodesToRemove = new LinkedHashSet();
    private final Set<T> nodesToUpdate = new LinkedHashSet();
    private boolean skipNextWait = false;

    /* loaded from: input_file:com/yahoo/search/cluster/ClusterMonitor$MonitorThread.class */
    private class MonitorThread extends Thread {
        MonitorThread(String str) {
            super(str);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            ClusterMonitor.log.info("Starting cluster monitor thread " + getName());
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
            while (true) {
                if (ClusterMonitor.this.closed.get()) {
                    break;
                }
                try {
                    ClusterMonitor.log.finest("Activating ping");
                    ClusterMonitor.this.ping(newCachedThreadPool);
                    synchronized (ClusterMonitor.this.nodeManager) {
                        if (!ClusterMonitor.this.skipNextWait) {
                            ClusterMonitor.this.nodeManager.wait(ClusterMonitor.this.configuration.getCheckInterval());
                        }
                        ClusterMonitor.this.skipNextWait = false;
                    }
                } catch (Throwable th) {
                    if (ClusterMonitor.this.closed.get() && (th instanceof InterruptedException)) {
                        break;
                    }
                    if (!(th instanceof Exception)) {
                        ClusterMonitor.log.log(Level.WARNING, "Error in monitor thread, will quit", th);
                        break;
                    }
                    ClusterMonitor.log.log(Level.WARNING, "Exception in monitor thread", th);
                    newCachedThreadPool.shutdown();
                    try {
                        if (!newCachedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)) {
                            ClusterMonitor.log.warning("Timeout waiting for ping executor to terminate");
                        }
                    } catch (InterruptedException e) {
                    }
                    ClusterMonitor.log.info("Stopped cluster monitor thread " + getName());
                    return;
                }
            }
        }
    }

    public ClusterMonitor(NodeManager<T> nodeManager, boolean z) {
        this.nodeManager = nodeManager;
        this.monitorThread = new MonitorThread("search.clustermonitor." + nodeManager.name());
        if (z) {
            this.monitorThread.start();
        }
    }

    public synchronized void reconfigure(Collection<T> collection) {
        if (!this.monitorThread.isAlive()) {
            throw new IllegalStateException("monitor thread must be alive for reconfiguration");
        }
        this.nodesToUpdate.addAll(collection);
        this.nodesToRemove.addAll(this.nodeMonitors.keySet());
        this.nodesToRemove.removeAll(collection);
        for (T t : collection) {
            if (!this.nodeMonitors.containsKey(t)) {
                add(t, true);
            }
        }
        synchronized (this.nodeManager) {
            this.skipNextWait = true;
            this.nodeManager.notifyAll();
        }
        while (true) {
            try {
                if (this.nodesToRemove.isEmpty() && this.nodesToUpdate.isEmpty()) {
                    this.nodeManager.pingIterationCompleted();
                    return;
                }
                wait(1L);
            } catch (InterruptedException e) {
                throw new UncheckedInterruptedException(e, true);
            }
        }
    }

    public void start() {
        if (this.monitorThread.isAlive()) {
            return;
        }
        this.monitorThread.start();
    }

    public MonitorConfiguration getConfiguration() {
        return this.configuration;
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public void add(T t, boolean z) {
        this.nodeMonitors.put(t, new TrafficNodeMonitor<>(t, this.configuration, z));
    }

    public synchronized void failed(T t, ErrorMessage errorMessage) {
        Consumer<TrafficNodeMonitor<T>> consumer = trafficNodeMonitor -> {
            trafficNodeMonitor.failed(errorMessage);
        };
        NodeManager<T> nodeManager = this.nodeManager;
        Objects.requireNonNull(nodeManager);
        updateMonitoredNode(t, consumer, nodeManager::failed);
    }

    public synchronized void responded(T t) {
        Consumer<TrafficNodeMonitor<T>> consumer = (v0) -> {
            v0.responded();
        };
        NodeManager<T> nodeManager = this.nodeManager;
        Objects.requireNonNull(nodeManager);
        updateMonitoredNode(t, consumer, nodeManager::working);
    }

    private void updateMonitoredNode(T t, Consumer<TrafficNodeMonitor<T>> consumer, Consumer<T> consumer2) {
        TrafficNodeMonitor<T> trafficNodeMonitor = this.nodeMonitors.get(t);
        if (this.closed.get()) {
            trafficNodeMonitor = null;
        }
        if (this.nodesToRemove.remove(t)) {
            this.nodeMonitors.remove(t);
            trafficNodeMonitor = null;
        }
        if (trafficNodeMonitor != null) {
            Boolean isKnownWorking = trafficNodeMonitor.isKnownWorking();
            consumer.accept(trafficNodeMonitor);
            if (isKnownWorking != trafficNodeMonitor.isKnownWorking()) {
                consumer2.accept(t);
            }
        }
        this.nodesToUpdate.remove(t);
    }

    public synchronized void ping(Executor executor) {
        for (BaseNodeMonitor<T> baseNodeMonitor : nodeMonitors()) {
            if (this.closed.get()) {
                return;
            }
            if (this.nodesToRemove.remove(baseNodeMonitor.getNode())) {
                this.nodeMonitors.remove(baseNodeMonitor.getNode());
            } else {
                this.nodeManager.ping(this, baseNodeMonitor.getNode(), executor);
            }
        }
        this.nodeManager.pingIterationCompleted();
    }

    public Iterator<BaseNodeMonitor<T>> nodeMonitorIterator() {
        return nodeMonitors().iterator();
    }

    public List<BaseNodeMonitor<T>> nodeMonitors() {
        return List.copyOf(this.nodeMonitors.values());
    }

    public void shutdown() {
        this.closed.set(true);
        synchronized (this) {
            this.nodeMonitors.clear();
        }
        synchronized (this.nodeManager) {
            this.skipNextWait = true;
            this.nodeManager.notifyAll();
        }
        try {
            if (this.monitorThread.isAlive()) {
                this.monitorThread.join();
            }
        } catch (InterruptedException e) {
        }
    }
}
