package org.apache.hedwig.client.netty.impl;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.class */
public abstract class AbstractSubscribeResponseHandler extends SubscribeResponseHandler {
    private static Logger logger = LoggerFactory.getLogger(AbstractSubscribeResponseHandler.class);
    protected final ReentrantReadWriteLock disconnectLock;
    protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions;
    protected final AbstractHChannelManager aChannelManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$StatusCode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$SubscriptionEvent = new int[PubSubProtocol.SubscriptionEvent.values().length];

        static {
            try {
                $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$SubscriptionEvent[PubSubProtocol.SubscriptionEvent.TOPIC_MOVED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$SubscriptionEvent[PubSubProtocol.SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$StatusCode = new int[PubSubProtocol.StatusCode.values().length];
            try {
                $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$StatusCode[PubSubProtocol.StatusCode.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$StatusCode[PubSubProtocol.StatusCode.CLIENT_ALREADY_SUBSCRIBED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$StatusCode[PubSubProtocol.StatusCode.SERVICE_DOWN.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$StatusCode[PubSubProtocol.StatusCode.NOT_RESPONSIBLE_FOR_TOPIC.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscribeResponseHandler(ClientConfiguration clientConfiguration, HChannelManager hChannelManager) {
        super(clientConfiguration, hChannelManager);
        this.disconnectLock = new ReentrantReadWriteLock();
        this.subscriptions = new ConcurrentHashMap();
        this.aChannelManager = (AbstractHChannelManager) hChannelManager;
    }

    protected HChannelManager getHChannelManager() {
        return this.channelManager;
    }

    protected ClientConfiguration getConfiguration() {
        return this.cfg;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ActiveSubscriber getActiveSubscriber(TopicSubscriber topicSubscriber) {
        return this.subscriptions.get(topicSubscriber);
    }

    protected ActiveSubscriber createActiveSubscriber(ClientConfiguration clientConfiguration, AbstractHChannelManager abstractHChannelManager, TopicSubscriber topicSubscriber, PubSubData pubSubData, PubSubProtocol.SubscriptionPreferences subscriptionPreferences, Channel channel, HChannel hChannel) {
        return new ActiveSubscriber(clientConfiguration, abstractHChannelManager, topicSubscriber, pubSubData, subscriptionPreferences, channel, hChannel);
    }

    @Override // org.apache.hedwig.client.handlers.AbstractResponseHandler
    public void handleResponse(PubSubProtocol.PubSubResponse pubSubResponse, PubSubData pubSubData, Channel channel) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.", VarArgs.va(pubSubResponse, pubSubData, NetUtils.getHostFromChannel(channel)));
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$StatusCode[pubSubResponse.getStatusCode().ordinal()]) {
            case 1:
                TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
                PubSubProtocol.SubscriptionPreferences subscriptionPreferences = null;
                if (pubSubResponse.hasResponseBody()) {
                    PubSubProtocol.ResponseBody responseBody = pubSubResponse.getResponseBody();
                    if (responseBody.hasSubscribeResponse()) {
                        PubSubProtocol.SubscribeResponse subscribeResponse = responseBody.getSubscribeResponse();
                        if (subscribeResponse.hasPreferences()) {
                            subscriptionPreferences = subscribeResponse.getPreferences();
                            if (logger.isDebugEnabled()) {
                                logger.debug("Receive subscription preferences for {} : {}", VarArgs.va(topicSubscriber, SubscriptionStateUtils.toString(subscriptionPreferences)));
                            }
                        }
                    }
                }
                ActiveSubscriber activeSubscriber = null;
                this.disconnectLock.readLock().lock();
                try {
                    Either<PubSubProtocol.StatusCode, HChannel> handleSuccessResponse = handleSuccessResponse(topicSubscriber, pubSubData, channel);
                    PubSubProtocol.StatusCode left = handleSuccessResponse.left();
                    if (PubSubProtocol.StatusCode.SUCCESS == left) {
                        activeSubscriber = createActiveSubscriber(this.cfg, this.aChannelManager, topicSubscriber, pubSubData, subscriptionPreferences, channel, handleSuccessResponse.right());
                        left = addSubscription(topicSubscriber, activeSubscriber);
                    }
                    if (PubSubProtocol.StatusCode.SUCCESS != left) {
                        pubSubData.getCallback().operationFailed(pubSubData.context, PubSubException.create(left, "Client is already subscribed for " + topicSubscriber));
                        return;
                    } else {
                        postHandleSuccessResponse(topicSubscriber, activeSubscriber);
                        pubSubData.getCallback().operationFinished(pubSubData.context, null);
                        return;
                    }
                } finally {
                    this.disconnectLock.readLock().unlock();
                }
            case 2:
                pubSubData.getCallback().operationFailed(pubSubData.context, new PubSubException.ClientAlreadySubscribedException("Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: " + pubSubData.subscriberId.toStringUtf8()));
                return;
            case 3:
                pubSubData.getCallback().operationFailed(pubSubData.context, new PubSubException.ServiceDownException("Server responded with a SERVICE_DOWN status"));
                return;
            case 4:
                handleRedirectResponse(pubSubResponse, pubSubData, channel);
                return;
            default:
                logger.error("Unexpected error response from server for PubSubResponse: " + pubSubResponse);
                pubSubData.getCallback().operationFailed(pubSubData.context, new PubSubException.ServiceDownException("Server responded with a status code of: " + pubSubResponse.getStatusCode(), PubSubException.create(pubSubResponse.getStatusCode(), "Original Exception")));
                return;
        }
    }

    protected abstract Either<PubSubProtocol.StatusCode, HChannel> handleSuccessResponse(TopicSubscriber topicSubscriber, PubSubData pubSubData, Channel channel);

    protected void postHandleSuccessResponse(TopicSubscriber topicSubscriber, ActiveSubscriber activeSubscriber) {
    }

    private PubSubProtocol.StatusCode addSubscription(TopicSubscriber topicSubscriber, ActiveSubscriber activeSubscriber) {
        return null != this.subscriptions.putIfAbsent(topicSubscriber, activeSubscriber) ? PubSubProtocol.StatusCode.CLIENT_ALREADY_SUBSCRIBED : PubSubProtocol.StatusCode.SUCCESS;
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void handleSubscribeMessage(PubSubProtocol.PubSubResponse pubSubResponse) {
        PubSubProtocol.Message message = pubSubResponse.getMessage();
        TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubResponse.getTopic(), pubSubResponse.getSubscriberId());
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe message in response: {}, {}", VarArgs.va(pubSubResponse, topicSubscriber));
        }
        ActiveSubscriber activeSubscriber = getActiveSubscriber(topicSubscriber);
        if (null == activeSubscriber) {
            logger.error("Subscriber {} is not found receiving its message {}.", VarArgs.va(topicSubscriber, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
        } else {
            activeSubscriber.handleMessage(message);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void asyncMessageDeliver(TopicSubscriber topicSubscriber, PubSubProtocol.Message message) {
        ActiveSubscriber activeSubscriber = getActiveSubscriber(topicSubscriber);
        if (null == activeSubscriber) {
            logger.error("Subscriber {} is not found delivering its message {}.", VarArgs.va(topicSubscriber, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
        } else {
            activeSubscriber.asyncMessageDeliver(message);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void messageConsumed(TopicSubscriber topicSubscriber, PubSubProtocol.Message message) {
        ActiveSubscriber activeSubscriber = getActiveSubscriber(topicSubscriber);
        if (null == activeSubscriber) {
            logger.warn("Subscriber {} is not found consumed its message {}.", VarArgs.va(topicSubscriber, MessageIdUtils.msgIdToReadableString(message.getMsgId())));
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Message has been successfully consumed by the client app : {}, {}", VarArgs.va(message, topicSubscriber));
        }
        activeSubscriber.messageConsumed(message);
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void handleSubscriptionEvent(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionEvent subscriptionEvent) {
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        ActiveSubscriber activeSubscriber = getActiveSubscriber(topicSubscriber);
        if (null == activeSubscriber) {
            logger.warn("No subscription {} found receiving subscription event {}.", VarArgs.va(topicSubscriber, subscriptionEvent));
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Received subscription event {} for ({}).", VarArgs.va(subscriptionEvent, topicSubscriber));
        }
        processSubscriptionEvent(activeSubscriber, subscriptionEvent);
    }

    protected void processSubscriptionEvent(ActiveSubscriber activeSubscriber, PubSubProtocol.SubscriptionEvent subscriptionEvent) {
        switch (AnonymousClass1.$SwitchMap$org$apache$hedwig$protocol$PubSubProtocol$SubscriptionEvent[subscriptionEvent.ordinal()]) {
            case 1:
            case 2:
                resubscribeIfNecessary(activeSubscriber, subscriptionEvent);
                return;
            default:
                logger.error("Receive unknown subscription event {} for {}.", VarArgs.va(subscriptionEvent, activeSubscriber.getTopicSubscriber()));
                return;
        }
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        ActiveSubscriber activeSubscriber = getActiveSubscriber(topicSubscriber);
        if (null == activeSubscriber) {
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Start delivering message for {} using message handler {}", VarArgs.va(topicSubscriber, messageHandler));
        }
        activeSubscriber.startDelivery(messageHandler);
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void stopDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException {
        ActiveSubscriber activeSubscriber = getActiveSubscriber(topicSubscriber);
        if (null == activeSubscriber) {
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Stop delivering messages for {}", topicSubscriber);
        }
        activeSubscriber.stopDelivery();
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public boolean hasSubscription(TopicSubscriber topicSubscriber) {
        return this.subscriptions.containsKey(topicSubscriber);
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void consume(TopicSubscriber topicSubscriber, PubSubProtocol.MessageSeqId messageSeqId) {
        ActiveSubscriber activeSubscriber = getActiveSubscriber(topicSubscriber);
        if (null == activeSubscriber) {
            logger.warn("Subscriber {} is not found consuming message {}.", VarArgs.va(topicSubscriber, MessageIdUtils.msgIdToReadableString(messageSeqId)));
        } else {
            activeSubscriber.consume(messageSeqId);
        }
    }

    @Override // org.apache.hedwig.client.handlers.SubscribeResponseHandler
    public void onChannelDisconnected(InetSocketAddress inetSocketAddress, Channel channel) {
        this.disconnectLock.writeLock().lock();
        try {
            onDisconnect(inetSocketAddress);
            this.disconnectLock.writeLock().unlock();
        } catch (Throwable th) {
            this.disconnectLock.writeLock().unlock();
            throw th;
        }
    }

    private void onDisconnect(InetSocketAddress inetSocketAddress) {
        Iterator<ActiveSubscriber> it = this.subscriptions.values().iterator();
        while (it.hasNext()) {
            onDisconnect(it.next(), inetSocketAddress);
        }
    }

    private void onDisconnect(ActiveSubscriber activeSubscriber, InetSocketAddress inetSocketAddress) {
        logger.info("Subscription channel for ({}) is disconnected.", activeSubscriber);
        resubscribeIfNecessary(activeSubscriber, PubSubProtocol.SubscriptionEvent.TOPIC_MOVED);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean removeSubscription(TopicSubscriber topicSubscriber, ActiveSubscriber activeSubscriber) {
        return this.subscriptions.remove(topicSubscriber, activeSubscriber);
    }

    protected void resubscribeIfNecessary(ActiveSubscriber activeSubscriber, PubSubProtocol.SubscriptionEvent subscriptionEvent) {
        if (removeSubscription(activeSubscriber.getTopicSubscriber(), activeSubscriber)) {
            activeSubscriber.resubscribeIfNecessary(subscriptionEvent);
        }
    }
}
