/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.time.LocalTime;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.map.MapView;
import net.openhft.chronicle.engine.api.pubsub.ISubscriber;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.cfg.SubscriptionStat;
import net.openhft.chronicle.engine.map.EventConsumer;
import net.openhft.chronicle.engine.map.ObjectSubscription;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.tree.QueueView;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.api.session.SessionProvider;
import net.openhft.chronicle.queue.Excerpt;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.WireKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueObjectSubscription<T, M>
implements ObjectSubscription<T, M> {
    private static final Logger LOG = LoggerFactory.getLogger(QueueObjectSubscription.class);
    private final Set<TopicSubscriber<T, M>> topicSubscribers = new CopyOnWriteArraySet<TopicSubscriber<T, M>>();
    private final Set<Subscriber<Excerpt>> subscribers = new CopyOnWriteArraySet<Subscriber<Excerpt>>();
    private final Set<EventConsumer<T, M>> downstream = new CopyOnWriteArraySet<EventConsumer<T, M>>();
    private final SessionProvider sessionProvider;
    @Nullable
    private final Asset asset;
    private final Map<Subscriber, Subscriber> subscriptionDelegate = new IdentityHashMap<Subscriber, Subscriber>();
    private final Class<T> topicType;
    private Map<String, SubscriptionStat> subscriptionMonitoringMap = null;
    private EventLoop eventLoop;
    private KeyValueStore<T, M> kvStore;

    public QueueObjectSubscription(@NotNull RequestContext requestContext, @NotNull Asset asset) {
        this(requestContext.topicType(), requestContext.viewType(), asset);
    }

    public QueueObjectSubscription(Class topicType, @Nullable Class viewType, @Nullable Asset asset) {
        this.asset = asset;
        if (viewType != null && asset != null) {
            asset.addView(viewType, this);
        }
        this.sessionProvider = asset == null ? null : asset.findView(SessionProvider.class);
        this.eventLoop = asset.root().acquireView(EventLoop.class);
        this.topicType = topicType;
    }

    public void close() {
        this.notifyEndOfSubscription(this.topicSubscribers);
        this.notifyEndOfSubscription(this.subscribers);
        this.notifyEndOfSubscription(this.downstream);
    }

    @Override
    public void onEndOfSubscription() {
        throw new UnsupportedOperationException("todo");
    }

    private void notifyEndOfSubscription(@NotNull Set<? extends ISubscriber> subscribers) {
        subscribers.forEach(this::notifyEndOfSubscription);
        subscribers.clear();
    }

    private void notifyEndOfSubscription(@NotNull ISubscriber subscriber) {
        try {
            subscriber.onEndOfSubscription();
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
        }
    }

    @Override
    public void setKvStore(KeyValueStore<T, M> kvStore) {
        this.kvStore = kvStore;
    }

    @Override
    public void notifyEvent(MapEvent<T, M> changeEvent) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public int entrySubscriberCount() {
        return 0;
    }

    @Override
    public int topicSubscriberCount() {
        return this.topicSubscribers.size();
    }

    @Override
    public boolean hasSubscribers() {
        return !this.topicSubscribers.isEmpty() || !this.subscribers.isEmpty() || !this.downstream.isEmpty() || this.asset.hasChildren();
    }

    @Override
    public boolean needsPrevious() {
        return !this.subscribers.isEmpty() || !this.downstream.isEmpty();
    }

    @Override
    public void registerSubscriber(@NotNull RequestContext rc, @NotNull Subscriber subscriber, @NotNull Filter filter) {
        try {
            QueueView chronicleQueue = this.asset.acquireView(QueueView.class);
            this.eventLoop.addHandler(() -> {
                Object e = chronicleQueue.get(rc.name());
                if (e != null) {
                    subscriber.accept(e);
                }
                return true;
            });
        }
        catch (Exception e) {
            throw Jvm.rethrow((Throwable)e);
        }
    }

    @Override
    public void registerTopicSubscriber(@NotNull RequestContext rc, @NotNull TopicSubscriber<T, M> subscriber) {
        this.addToStats("topicSubscription");
        this.topicSubscribers.add(subscriber);
        AtomicBoolean terminate = new AtomicBoolean();
        QueueView chronicleQueue = this.asset.acquireView(QueueView.class);
        this.eventLoop.addHandler(() -> {
            if (terminate.get()) {
                throw new InvalidEventHandlerException();
            }
            chronicleQueue.get((eventName, m) -> {
                try {
                    subscriber.onMessage(this.toT((CharSequence)eventName), m);
                }
                catch (InvalidSubscriberException e) {
                    terminate.set(true);
                }
            });
            return true;
        });
    }

    private T toT(CharSequence eventName) {
        if (this.topicType == CharSequence.class) {
            return (T)eventName;
        }
        if (this.topicType == String.class) {
            return (T)eventName.toString();
        }
        if (this.topicType == WireKey.class) {
            return (T)((WireKey)() -> eventName.toString());
        }
        throw new UnsupportedOperationException("unable to convert " + eventName + " to type " + this.topicType);
    }

    @Override
    public void registerDownstream(@NotNull EventConsumer<T, M> subscription) {
        this.downstream.add(subscription);
    }

    public void unregisterDownstream(EventConsumer<T, M> subscription) {
        this.downstream.remove(subscription);
    }

    @Override
    public void unregisterSubscriber(@NotNull Subscriber subscriber) {
        Subscriber delegate = this.subscriptionDelegate.get(subscriber);
        Subscriber s = delegate != null ? delegate : subscriber;
        boolean subscription = this.subscribers.remove(s);
        if (subscription) {
            this.removeFromStats("subscription");
        }
        s.onEndOfSubscription();
    }

    @Override
    public int keySubscriberCount() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void registerKeySubscriber(@NotNull RequestContext rc, @NotNull Subscriber<T> subscriber, @NotNull Filter<T> filter) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void unregisterTopicSubscriber(@NotNull TopicSubscriber subscriber) {
        this.topicSubscribers.remove(subscriber);
        this.removeFromStats("topicSubscription");
        subscriber.onEndOfSubscription();
    }

    private Map getSubscriptionMap() {
        if (this.subscriptionMonitoringMap != null) {
            return this.subscriptionMonitoringMap;
        }
        Asset subscriptionAsset = this.asset.root().getAsset("proc/subscriptions");
        if (subscriptionAsset != null && subscriptionAsset.getView(MapView.class) != null) {
            this.subscriptionMonitoringMap = subscriptionAsset.getView(MapView.class);
        }
        return this.subscriptionMonitoringMap;
    }

    private void addToStats(String subType) {
        if (this.sessionProvider == null) {
            return;
        }
        SessionDetails sessionDetails = this.sessionProvider.get();
        if (sessionDetails != null) {
            String userId = sessionDetails.userId();
            Map subStats = this.getSubscriptionMap();
            if (subStats != null) {
                SubscriptionStat stat = (SubscriptionStat)subStats.get(userId + "~" + subType);
                if (stat == null) {
                    stat = new SubscriptionStat();
                    stat.setFirstSubscribed(LocalTime.now());
                }
                stat.setTotalSubscriptions(stat.getTotalSubscriptions() + 1);
                stat.setActiveSubscriptions(stat.getActiveSubscriptions() + 1);
                stat.setRecentlySubscribed(LocalTime.now());
                subStats.put(userId + "~" + subType, stat);
            }
        }
    }

    private void removeFromStats(String subType) {
        if (this.sessionProvider == null) {
            return;
        }
        SessionDetails sessionDetails = this.sessionProvider.get();
        if (sessionDetails != null) {
            String userId = sessionDetails.userId();
            Map subStats = this.getSubscriptionMap();
            if (subStats != null) {
                SubscriptionStat stat = (SubscriptionStat)subStats.get(userId + "~" + subType);
                if (stat == null) {
                    throw new AssertionError((Object)"There should be an active subscription");
                }
                stat.setActiveSubscriptions(stat.getActiveSubscriptions() - 1);
                stat.setRecentlySubscribed(LocalTime.now());
                subStats.put(userId + "~" + subType, stat);
            }
        }
    }
}

