package org.apache.pinot.minion.event;

import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.pinot.minion.MinionConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/minion/event/MinionEventObservers.class */
public class MinionEventObservers {
    private static final Logger LOGGER = LoggerFactory.getLogger(MinionEventObservers.class);
    private static final MinionEventObservers DEFAULT_INSTANCE = new MinionEventObservers();
    private static volatile MinionEventObservers _customInstance = null;
    private final Map<String, MinionEventObserver> _taskEventObservers;
    private final Queue<EndedTask> _endedTaskQueue;
    private final ReentrantLock _queueLock;
    private final Condition _availableToClean;
    private final long _eventObserverCleanupDelayMs;
    private final ExecutorService _cleanupExecutor;

    /* loaded from: input_file:org/apache/pinot/minion/event/MinionEventObservers$EndedTask.class */
    private static class EndedTask {
        private final String _taskId;
        private final long _endTs;

        public EndedTask(String str, long j) {
            this._taskId = str;
            this._endTs = j;
        }
    }

    private MinionEventObservers() {
        this(0L, null);
    }

    private MinionEventObservers(long j, ExecutorService executorService) {
        this._taskEventObservers = new ConcurrentHashMap();
        this._endedTaskQueue = new LinkedList();
        this._queueLock = new ReentrantLock();
        this._availableToClean = this._queueLock.newCondition();
        this._eventObserverCleanupDelayMs = j * 1000;
        this._cleanupExecutor = executorService;
        startCleanup();
    }

    private void startCleanup() {
        if (this._cleanupExecutor == null || this._eventObserverCleanupDelayMs == 0) {
            LOGGER.info("Configured to clean up task event observers immediately");
        } else {
            LOGGER.info("Configured to clean up task event observers with cleanupDelayMs: {}", Long.valueOf(this._eventObserverCleanupDelayMs));
            this._cleanupExecutor.submit(() -> {
                LOGGER.info("Start to cleanup task event observers with cleanupDelayMs: {}", Long.valueOf(this._eventObserverCleanupDelayMs));
                while (!Thread.interrupted()) {
                    this._queueLock.lock();
                    try {
                        EndedTask peek = this._endedTaskQueue.peek();
                        if (peek == null) {
                            this._availableToClean.await();
                        } else {
                            long currentTimeMillis = System.currentTimeMillis();
                            if (peek._endTs + this._eventObserverCleanupDelayMs <= currentTimeMillis) {
                                LOGGER.info("Cleaning up event observer for task: {} that ended at: {}, now at: {} after delay: {}", new Object[]{peek._taskId, Long.valueOf(peek._endTs), Long.valueOf(currentTimeMillis), Long.valueOf(this._eventObserverCleanupDelayMs)});
                                this._taskEventObservers.remove(peek._taskId);
                                this._endedTaskQueue.poll();
                            } else {
                                this._availableToClean.await((peek._endTs + this._eventObserverCleanupDelayMs) - currentTimeMillis, TimeUnit.MILLISECONDS);
                            }
                        }
                    } finally {
                        this._queueLock.unlock();
                    }
                }
                LOGGER.info("Stop to cleanup task event observers");
                return null;
            });
        }
    }

    public static void init(MinionConf minionConf, ExecutorService executorService) {
        _customInstance = new MinionEventObservers(minionConf.getProperty("pinot.minion.event.observer.cleanupDelayInSec", 0), executorService);
    }

    public static MinionEventObservers getInstance() {
        if (_customInstance != null) {
            return _customInstance;
        }
        LOGGER.warn("Using default MinionEventObservers instance");
        return DEFAULT_INSTANCE;
    }

    public MinionEventObserver getMinionEventObserver(String str) {
        return this._taskEventObservers.get(str);
    }

    public Map<String, MinionEventObserver> getMinionEventObservers() {
        return this._taskEventObservers;
    }

    public Map<String, MinionEventObserver> getMinionEventObserverWithGivenState(MinionTaskState minionTaskState) {
        return (Map) this._taskEventObservers.entrySet().stream().filter(entry -> {
            return ((MinionEventObserver) entry.getValue()).getTaskState() == minionTaskState;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    public void addMinionEventObserver(String str, MinionEventObserver minionEventObserver) {
        LOGGER.debug("Keep track of event observer for task: {}", str);
        this._taskEventObservers.put(str, minionEventObserver);
    }

    public void removeMinionEventObserver(String str) {
        if (this._eventObserverCleanupDelayMs <= 0) {
            LOGGER.debug("Clean up event observer for task: {} immediately", str);
            this._taskEventObservers.remove(str);
            return;
        }
        LOGGER.debug("Clean up event observer for task: {} after delay: {}", str, Long.valueOf(this._eventObserverCleanupDelayMs));
        this._queueLock.lock();
        try {
            this._endedTaskQueue.offer(new EndedTask(str, System.currentTimeMillis()));
            this._availableToClean.signalAll();
        } finally {
            this._queueLock.unlock();
        }
    }
}
