/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.cnc;

import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.EventSubscription;
import com.couchbase.client.core.deps.org.jctools.queues.MpscArrayQueue;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.CbThrowables;
import com.couchbase.client.core.util.NanoTimestamp;
import java.io.PrintStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public class DefaultEventBus
implements EventBus {
    private static final int DEFAULT_QUEUE_CAPACITY = 16384;
    private static final Duration DEFAULT_IDLE_SLEEP_DURATION = Duration.ofMillis(100L);
    private static final Duration DEFAULT_OVERFLOW_LOG_INTERVAL = Duration.ofSeconds(30L);
    private final CopyOnWriteArraySet<Consumer<Event>> subscribers = new CopyOnWriteArraySet();
    private final Queue<Event> eventQueue;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final PrintStream errorLogging;
    private final String threadName;
    private final Duration idleSleepDuration;
    private final Duration overflowLogInterval;
    private final Scheduler scheduler;
    private volatile Thread runningThread;
    private volatile NanoTimestamp overflowLogTimestamp = NanoTimestamp.never();
    private final Map<Class<? extends Event>, SampleEventAndCount> overflowInfo = new ConcurrentHashMap<Class<? extends Event>, SampleEventAndCount>();

    public static Builder builder(Scheduler scheduler) {
        return new Builder(scheduler);
    }

    public static DefaultEventBus create(Scheduler scheduler) {
        return DefaultEventBus.builder(scheduler).build();
    }

    private DefaultEventBus(Builder builder) {
        this.eventQueue = new MpscArrayQueue<Event>(builder.queueCapacity);
        this.scheduler = builder.scheduler;
        this.errorLogging = builder.errorLogging.orElse(null);
        this.threadName = builder.threadName;
        this.idleSleepDuration = builder.idleSleepDuration;
        this.overflowLogInterval = builder.overflowLogInterval;
    }

    @Override
    public EventSubscription subscribe(Consumer<Event> consumer) {
        this.subscribers.add(consumer);
        return new EventSubscription(this, consumer);
    }

    @Override
    public void unsubscribe(EventSubscription subscription) {
        this.subscribers.remove(subscription.consumer());
    }

    @Override
    public EventBus.PublishResult publish(Event event) {
        if (!this.isRunning()) {
            return EventBus.PublishResult.SHUTDOWN;
        }
        if (this.eventQueue.offer(event)) {
            return EventBus.PublishResult.SUCCESS;
        }
        if (this.errorLogging != null) {
            try {
                this.overflowInfo.compute(event.getClass(), (k, v) -> v == null ? new SampleEventAndCount(event) : v.updateAndIncrement(event));
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        return EventBus.PublishResult.OVERLOADED;
    }

    @Override
    public Mono<Void> start() {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(false, true)) {
                this.runningThread = new Thread(() -> {
                    long idleSleepTime = this.idleSleepDuration.toMillis();
                    long overflowCounter = 0L;
                    while (this.isRunning() || !this.eventQueue.isEmpty()) {
                        Event event = this.eventQueue.poll();
                        while (event != null) {
                            for (Consumer<Event> subscriber : this.subscribers) {
                                try {
                                    subscriber.accept(event);
                                }
                                catch (Throwable t) {
                                    if (this.errorLogging == null) continue;
                                    this.errorLogging.println("Exception caught in EventBus Consumer: " + t);
                                    t.printStackTrace();
                                }
                            }
                            event = this.eventQueue.poll();
                            if (++overflowCounter != 10000L) continue;
                            this.maybePrintOverflow();
                            overflowCounter = 0L;
                        }
                        this.maybePrintOverflow();
                        try {
                            if (!this.isRunning()) continue;
                            Thread.sleep(idleSleepTime);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                });
                this.runningThread.setDaemon(true);
                this.runningThread.setName(this.threadName);
                this.runningThread.start();
            }
            return Mono.empty();
        });
    }

    private void maybePrintOverflow() {
        try {
            if (this.errorLogging != null && !this.overflowInfo.isEmpty() && this.overflowLogTimestamp.hasElapsed(this.overflowLogInterval)) {
                HashMap<String, Map<String, AtomicLong>> encodedEvents = new HashMap<String, Map<String, AtomicLong>>();
                Iterator<Map.Entry<Class<? extends Event>, SampleEventAndCount>> i = this.overflowInfo.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry<Class<? extends Event>, SampleEventAndCount> e = i.next();
                    encodedEvents.put(e.getKey().getSimpleName(), CbCollections.mapOf("sampleEvent", e.getValue().event.toString(), "totalDropCount", e.getValue().count));
                    i.remove();
                }
                this.errorLogging.println("Some events could not be published because the queue was (likely temporarily) over capacity: " + Mapper.encodeAsString(encodedEvents));
                this.overflowInfo.clear();
                this.overflowLogTimestamp = NanoTimestamp.now();
            }
        }
        catch (Exception ex) {
            this.errorLogging.println("Encountered an error while processing the overflow queue - this is a bug: " + CbThrowables.getStackTraceAsString(ex));
            this.overflowInfo.clear();
        }
    }

    @Override
    public Mono<Void> stop(Duration timeout) {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(true, false)) {
                this.runningThread.interrupt();
            }
            this.overflowInfo.clear();
            return Mono.empty();
        }).then(Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)this.scheduler).takeUntil(i -> !this.runningThread.isAlive()).then()).timeout(timeout, this.scheduler);
    }

    boolean isRunning() {
        return this.running.get();
    }

    boolean hasSubscribers() {
        return !this.subscribers.isEmpty();
    }

    public static class Builder {
        private final Scheduler scheduler;
        private int queueCapacity;
        private Optional<PrintStream> errorLogging;
        private String threadName;
        private Duration idleSleepDuration;
        private Duration overflowLogInterval;

        Builder(Scheduler scheduler) {
            this.scheduler = scheduler;
            this.queueCapacity = 16384;
            this.errorLogging = Optional.of(System.err);
            this.threadName = "cb-events";
            this.idleSleepDuration = DEFAULT_IDLE_SLEEP_DURATION;
            this.overflowLogInterval = DEFAULT_OVERFLOW_LOG_INTERVAL;
        }

        public Builder queueCapacity(int queueCapacity) {
            this.queueCapacity = queueCapacity;
            return this;
        }

        public Builder errorLogging(Optional<PrintStream> errorLogging) {
            this.errorLogging = errorLogging;
            return this;
        }

        public Builder threadName(String threadName) {
            this.threadName = threadName;
            return this;
        }

        public Builder idleSleepDuration(Duration idleSleepDuration) {
            this.idleSleepDuration = idleSleepDuration;
            return this;
        }

        public Builder overflowLogInterval(Duration overflowLogInterval) {
            this.overflowLogInterval = overflowLogInterval;
            return this;
        }

        public DefaultEventBus build() {
            return new DefaultEventBus(this);
        }
    }

    static class SampleEventAndCount {
        private volatile Event event;
        private final AtomicLong count = new AtomicLong(1L);

        private SampleEventAndCount(Event event) {
            this.event = event;
        }

        public SampleEventAndCount updateAndIncrement(Event event) {
            this.event = event;
            this.count.incrementAndGet();
            return this;
        }

        public String toString() {
            return "{sampleEvent=" + this.event + ", totalDropCount=" + this.count + '}';
        }
    }
}

