package org.apache.hedwig.client.netty.impl.simple;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.Either;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.class */
public class SimpleSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
    private static Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class);
    private volatile TopicSubscriber origTopicSubscriber;
    private volatile ActiveSubscriber origActiveSubscriber;
    private SimpleHChannelManager sChannelManager;

    /* loaded from: input_file:org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler$SimpleActiveSubscriber.class */
    static class SimpleActiveSubscriber extends ActiveSubscriber {
        private final Set<PubSubProtocol.Message> outstandingMsgSet;

        public SimpleActiveSubscriber(ClientConfiguration clientConfiguration, AbstractHChannelManager abstractHChannelManager, TopicSubscriber topicSubscriber, PubSubData pubSubData, PubSubProtocol.SubscriptionPreferences subscriptionPreferences, Channel channel, HChannel hChannel) {
            super(clientConfiguration, abstractHChannelManager, topicSubscriber, pubSubData, subscriptionPreferences, channel, hChannel);
            this.outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap(clientConfiguration.getMaximumOutstandingMessages(), 1.0f));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hedwig.client.netty.impl.ActiveSubscriber
        public void unsafeDeliverMessage(PubSubProtocol.Message message) {
            this.outstandingMsgSet.add(message);
            if (this.outstandingMsgSet.size() >= this.cfg.getMaximumOutstandingMessages() && this.channel.isReadable()) {
                if (SimpleSubscribeResponseHandler.logger.isDebugEnabled()) {
                    SimpleSubscribeResponseHandler.logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel", Integer.valueOf(this.outstandingMsgSet.size()));
                }
                this.channel.setReadable(false);
            }
            super.unsafeDeliverMessage(message);
        }

        @Override // org.apache.hedwig.client.netty.impl.ActiveSubscriber
        public synchronized void messageConsumed(PubSubProtocol.Message message) {
            super.messageConsumed(message);
            this.outstandingMsgSet.remove(message);
            if (this.channel.isReadable() || this.outstandingMsgSet.size() != 0) {
                return;
            }
            if (SimpleSubscribeResponseHandler.logger.isDebugEnabled()) {
                SimpleSubscribeResponseHandler.logger.debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for {}", this.topicSubscriber);
            }
            this.channel.setReadable(true);
        }

        @Override // org.apache.hedwig.client.netty.impl.ActiveSubscriber
        public synchronized void startDelivery(MessageHandler messageHandler) throws AlreadyStartDeliveryException, PubSubException.ClientNotSubscribedException {
            super.startDelivery(messageHandler);
            this.channel.setReadable(true).addListener(new ChannelFutureListener() { // from class: org.apache.hedwig.client.netty.impl.simple.SimpleSubscribeResponseHandler.SimpleActiveSubscriber.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    SimpleSubscribeResponseHandler.logger.error("Unable to make subscriber Channel readable in startDelivery call for {}", SimpleActiveSubscriber.this.topicSubscriber);
                }
            });
        }

        @Override // org.apache.hedwig.client.netty.impl.ActiveSubscriber
        public synchronized void stopDelivery() {
            super.stopDelivery();
            this.channel.setReadable(false).addListener(new ChannelFutureListener() { // from class: org.apache.hedwig.client.netty.impl.simple.SimpleSubscribeResponseHandler.SimpleActiveSubscriber.2
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    SimpleSubscribeResponseHandler.logger.error("Unable to make subscriber Channel not readable in stopDelivery call for {}", SimpleActiveSubscriber.this.topicSubscriber);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SimpleSubscribeResponseHandler(ClientConfiguration clientConfiguration, HChannelManager hChannelManager) {
        super(clientConfiguration, hChannelManager);
        this.sChannelManager = (SimpleHChannelManager) hChannelManager;
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler
    protected ActiveSubscriber createActiveSubscriber(ClientConfiguration clientConfiguration, AbstractHChannelManager abstractHChannelManager, TopicSubscriber topicSubscriber, PubSubData pubSubData, PubSubProtocol.SubscriptionPreferences subscriptionPreferences, Channel channel, HChannel hChannel) {
        return new SimpleActiveSubscriber(clientConfiguration, abstractHChannelManager, topicSubscriber, pubSubData, subscriptionPreferences, channel, hChannel);
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler
    protected synchronized ActiveSubscriber getActiveSubscriber(TopicSubscriber topicSubscriber) {
        if (null == this.origTopicSubscriber || !this.origTopicSubscriber.equals(topicSubscriber)) {
            return null;
        }
        return this.origActiveSubscriber;
    }

    private synchronized ActiveSubscriber getActiveSubscriber() {
        return this.origActiveSubscriber;
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler, org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public synchronized boolean hasSubscription(TopicSubscriber topicSubscriber) {
        if (null == this.origTopicSubscriber) {
            return false;
        }
        return this.origTopicSubscriber.equals(topicSubscriber);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler
    public synchronized boolean removeSubscription(TopicSubscriber topicSubscriber, ActiveSubscriber activeSubscriber) {
        if (null != this.origTopicSubscriber && !this.origTopicSubscriber.equals(topicSubscriber)) {
            return false;
        }
        this.origTopicSubscriber = null;
        this.origActiveSubscriber = null;
        return super.removeSubscription(topicSubscriber, activeSubscriber);
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler, org.apache.hedwig.client.handlers.AbstractResponseHandler
    public void handleResponse(PubSubProtocol.PubSubResponse pubSubResponse, PubSubData pubSubData, Channel channel) throws Exception {
        if (!pubSubResponse.getStatusCode().equals(PubSubProtocol.StatusCode.SUCCESS)) {
            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
            channel.close();
        }
        super.handleResponse(pubSubResponse, pubSubData, channel);
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler, org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void handleSubscribeMessage(PubSubProtocol.PubSubResponse pubSubResponse) {
        PubSubProtocol.Message message = pubSubResponse.getMessage();
        ActiveSubscriber activeSubscriber = getActiveSubscriber();
        if (null == activeSubscriber) {
            logger.error("No Subscriber is alive receiving its message {}.", MessageIdUtils.msgIdToReadableString(message.getMsgId()));
        } else {
            activeSubscriber.handleMessage(message);
        }
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler
    protected Either<PubSubProtocol.StatusCode, HChannel> handleSuccessResponse(TopicSubscriber topicSubscriber, PubSubData pubSubData, Channel channel) {
        Either<Boolean, HChannel> storeSubscriptionChannel = this.sChannelManager.storeSubscriptionChannel(topicSubscriber, pubSubData, channel);
        if (storeSubscriptionChannel.left().booleanValue()) {
            return Either.of(PubSubProtocol.StatusCode.SUCCESS, storeSubscriptionChannel.right());
        }
        return Either.of(pubSubData.isResubscribeRequest() ? PubSubProtocol.StatusCode.RESUBSCRIBE_EXCEPTION : PubSubProtocol.StatusCode.CLIENT_ALREADY_SUBSCRIBED, null);
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler
    protected synchronized void postHandleSuccessResponse(TopicSubscriber topicSubscriber, ActiveSubscriber activeSubscriber) {
        this.origTopicSubscriber = topicSubscriber;
        this.origActiveSubscriber = activeSubscriber;
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void asyncCloseSubscription(TopicSubscriber topicSubscriber, Callback<PubSubProtocol.ResponseBody> callback, Object obj) {
        callback.operationFinished(obj, (PubSubProtocol.ResponseBody) null);
    }
}
