/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.pubsub;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Function;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.engine.api.pubsub.Reference;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionConsumer;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.ObjectSubscription;
import net.openhft.chronicle.engine.pubsub.SimpleSubscription;
import net.openhft.chronicle.engine.query.Filter;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueSimpleSubscription<E>
implements SimpleSubscription<E> {
    private static final Logger LOG = LoggerFactory.getLogger(QueueSimpleSubscription.class);
    private final Set<Subscriber<E>> subscribers = new CopyOnWriteArraySet<Subscriber<E>>();
    private final Reference<E> currentValue;
    private final Function<Object, E> valueReader;
    private final Asset parent;
    private final ObjectSubscription objectSubscription;
    private final String name;

    public QueueSimpleSubscription(Reference<E> reference, Function<Object, E> valueReader, Asset parent, String name) {
        this.currentValue = reference;
        this.valueReader = valueReader;
        this.parent = parent;
        this.objectSubscription = parent.acquireView(ObjectSubscription.class);
        this.name = name;
    }

    @Override
    public void registerSubscriber(@NotNull RequestContext rc, @NotNull Subscriber<E> subscriber, @NotNull Filter<E> filter) {
        this.objectSubscription.registerSubscriber(rc, subscriber, filter);
    }

    @Override
    public void unregisterSubscriber(@NotNull Subscriber subscriber) {
        this.subscribers.remove(subscriber);
        this.objectSubscription.unregisterSubscriber(subscriber);
    }

    @Override
    public int keySubscriberCount() {
        return this.subscriberCount();
    }

    @Override
    public int entrySubscriberCount() {
        return 0;
    }

    @Override
    public int topicSubscriberCount() {
        return 0;
    }

    @Override
    public int subscriberCount() {
        return this.subscribers.size();
    }

    @Override
    public void notifyMessage(Object e) {
        try {
            Object ee = e instanceof BytesStore ? this.valueReader.apply(e) : e;
            SubscriptionConsumer.notifyEachSubscriber(this.subscribers, s -> s.onMessage(ee));
        }
        catch (ClassCastException e1) {
            System.err.println("Is " + this.valueReader + " the correct ValueReader?");
            throw e1;
        }
    }

    public void close() {
        for (Subscriber<E> subscriber : this.subscribers) {
            try {
                subscriber.onEndOfSubscription();
            }
            catch (Exception e) {
                LOG.error("", (Throwable)e);
            }
        }
    }
}

