/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map.remote;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.engine.server.internal.SubscriptionHandler;
import net.openhft.chronicle.engine.tree.TopologicalEvent;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractRemoteSubscription<E>
extends AbstractStatelessClient
implements SubscriptionCollection<E> {
    private static final Logger LOG = LoggerFactory.getLogger(MapWireHandler.class);
    final Map<Object, Long> subscribersToTid = new ConcurrentHashMap<Object, Long>();

    AbstractRemoteSubscription(@NotNull TcpChannelHub hub, long cid, @NotNull String csp) {
        super(hub, cid, csp);
    }

    @Override
    public void registerSubscriber(@NotNull RequestContext rc, @NotNull Subscriber<E> subscriber, @NotNull Filter<E> filter) {
        this.registerSubscriber0(rc, subscriber, filter);
    }

    @Override
    public void unregisterSubscriber(@NotNull Subscriber subscriber) {
        this.unregisterSubscriber0(subscriber);
    }

    void registerSubscriber0(final @NotNull RequestContext rc, final @NotNull Subscriber subscriber, final @NotNull Filter filter) {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        Boolean bootstrap = rc.bootstrap();
        Boolean endSubscriptionAfterBootstrap = rc.endSubscriptionAfterBootstrap();
        String csp = this.csp;
        if (bootstrap != null) {
            csp = csp + "&bootstrap=" + bootstrap;
        }
        if (endSubscriptionAfterBootstrap != null) {
            csp = csp + "&endSubscriptionAfterBootstrap=" + endSubscriptionAfterBootstrap;
        }
        if (rc.throttlePeriodMs() > 0) {
            csp = csp + "&throttlePeriodMs=" + rc.throttlePeriodMs();
        }
        this.hub.subscribe((AsyncSubscription)new AbstractAsyncSubscription(this.hub, csp, this.getClass().getSimpleName()){
            {
                super(arg0, arg1, arg2);
                AbstractRemoteSubscription.this.subscribersToTid.put(subscriber, this.tid());
            }

            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName((WireKey)SubscriptionHandler.SubscriptionEventID.registerSubscriber).typeLiteral((CharSequence)ClassAliasPool.CLASS_ALIASES.nameFor(rc.elementType()));
                if (!filter.isEmpty()) {
                    wireOut.writeEventName(() -> "filter").object((Object)filter);
                }
            }

            public void onConsumer(@NotNull WireIn inWire) {
                inWire.readDocument(null, d -> {
                    StringBuilder eventName = Wires.acquireStringBuilder();
                    ValueIn valueIn = d.readEventName(eventName);
                    if (PublisherHandler.EventId.onEndOfSubscription.contentEquals(eventName)) {
                        subscriber.onEndOfSubscription();
                        AbstractRemoteSubscription.this.subscribersToTid.remove((Object)this);
                        AbstractRemoteSubscription.this.hub.unsubscribe(this.tid());
                    } else if (CoreFields.reply.contentEquals((CharSequence)eventName)) {
                        Class aClass = rc.elementType();
                        Object object = MapEvent.class.isAssignableFrom(aClass) || TopologicalEvent.class.isAssignableFrom(aClass) ? valueIn.typedMarshallable() : valueIn.object(rc.elementType());
                        AbstractRemoteSubscription.this.onEvent(object, subscriber);
                    }
                });
            }
        });
    }

    private void onEvent(@Nullable Object message, @NotNull Subscriber subscriber) {
        try {
            if (message != null) {
                subscriber.onMessage(message);
            }
        }
        catch (InvalidSubscriberException noLongerValid) {
            this.unregisterSubscriber(subscriber);
        }
    }

    void unregisterSubscriber0(@NotNull Subscriber subscriber) {
        Long tid = this.subscribersToTid.get(subscriber);
        if (tid == null) {
            LOG.warn("There is no subscription to unsubscribe");
            return;
        }
        this.hub.preventSubscribeUponReconnect(tid.longValue());
        if (!this.hub.isOpen()) {
            this.hub.unsubscribe(tid.longValue());
            return;
        }
        boolean success = this.hub.lock(() -> {
            this.writeMetaDataForKnownTID(tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)SubscriptionHandler.SubscriptionEventID.unregisterSubscriber).text((CharSequence)""));
        });
        if (!success) {
            this.hub.unsubscribe(tid.longValue());
        }
    }

    @Override
    public int topicSubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandler.SubscriptionEventID.topicSubscriberCount);
    }

    @Override
    public int keySubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandler.SubscriptionEventID.keySubscriberCount);
    }

    @Override
    public int entrySubscriberCount() {
        return this.proxyReturnInt((WireKey)SubscriptionHandler.SubscriptionEventID.entrySubscriberCount);
    }
}

