package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.util.List;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.SubscriptionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/HedwigSubscriber.class */
public class HedwigSubscriber implements Subscriber {
    private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
    protected final ClientConfiguration cfg;
    protected final HChannelManager channelManager;

    public HedwigSubscriber(HedwigClientImpl hedwigClientImpl) {
        this.cfg = hedwigClientImpl.getConfiguration();
        this.channelManager = hedwigClientImpl.getHChannelManager();
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void addSubscriptionListener(SubscriptionListener subscriptionListener) {
        this.channelManager.getSubscriptionEventEmitter().addSubscriptionListener(subscriptionListener);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void removeSubscriptionListener(SubscriptionListener subscriptionListener) {
        this.channelManager.getSubscriptionEventEmitter().removeSubscriptionListener(subscriptionListener);
    }

    private void subUnsub(ByteString byteString, ByteString byteString2, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscriptionOptions subscriptionOptions) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException {
        if (logger.isDebugEnabled()) {
            StringBuilder append = new StringBuilder().append("Calling a sync subUnsub request for topic: ").append(byteString.toStringUtf8()).append(", subscriberId: ").append(byteString2.toStringUtf8()).append(", operationType: ").append(operationType);
            if (null != subscriptionOptions) {
                append.append(", createOrAttach: ").append(subscriptionOptions.getCreateOrAttach()).append(", messageBound: ").append(subscriptionOptions.getMessageBound());
            }
            logger.debug(append.toString());
        }
        PubSubData pubSubData = new PubSubData(byteString, null, byteString2, operationType, subscriptionOptions, null, null);
        synchronized (pubSubData) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            asyncSubUnsub(byteString, byteString2, pubSubCallback, null, operationType, subscriptionOptions);
            while (!pubSubData.isDone) {
                try {
                    pubSubData.wait();
                } catch (InterruptedException e) {
                    throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
                }
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                PubSubException.CouldNotConnectException failureException = pubSubCallback.getFailureException();
                if (failureException == null) {
                    logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
                    throw new PubSubException.ServiceDownException("Server ack response to SubUnsub request is not successful");
                }
                if (failureException instanceof PubSubException.CouldNotConnectException) {
                    throw failureException;
                }
                if (failureException instanceof PubSubException.ClientAlreadySubscribedException) {
                    throw ((PubSubException.ClientAlreadySubscribedException) failureException);
                }
                if (failureException instanceof PubSubException.ClientNotSubscribedException) {
                    throw ((PubSubException.ClientNotSubscribedException) failureException);
                }
                if (failureException instanceof PubSubException.ServiceDownException) {
                    throw ((PubSubException.ServiceDownException) failureException);
                }
                logger.error("Unexpected PubSubException thrown: ", failureException);
                throw new PubSubException.ServiceDownException(failureException);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void asyncSubUnsub(ByteString byteString, ByteString byteString2, Callback<PubSubProtocol.ResponseBody> callback, Object obj, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscriptionOptions subscriptionOptions) {
        if (logger.isDebugEnabled()) {
            StringBuilder append = new StringBuilder().append("Calling a async subUnsub request for topic: ").append(byteString.toStringUtf8()).append(", subscriberId: ").append(byteString2.toStringUtf8()).append(", operationType: ").append(operationType);
            if (null != subscriptionOptions) {
                append.append(", createOrAttach: ").append(subscriptionOptions.getCreateOrAttach()).append(", messageBound: ").append(subscriptionOptions.getMessageBound());
            }
            logger.debug(append.toString());
        }
        if (PubSubProtocol.OperationType.SUBSCRIBE.equals(operationType) && subscriptionOptions.getMessageBound() <= 0 && this.cfg.getSubscriptionMessageBound() > 0) {
            subscriptionOptions = PubSubProtocol.SubscriptionOptions.newBuilder(subscriptionOptions).setMessageBound(this.cfg.getSubscriptionMessageBound()).build();
        }
        this.channelManager.submitOp(new PubSubData(byteString, null, byteString2, operationType, subscriptionOptions, callback, obj));
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void subscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        subscribe(byteString, byteString2, PubSubProtocol.SubscriptionOptions.newBuilder().setCreateOrAttach(createOrAttach).build(), false);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void subscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionOptions subscriptionOptions) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        subscribe(byteString, byteString2, subscriptionOptions, false);
    }

    protected void subscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionOptions subscriptionOptions, boolean z) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!isValidSubscriberId(byteString2, z)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z);
        }
        try {
            subUnsub(byteString, byteString2, PubSubProtocol.OperationType.SUBSCRIBE, subscriptionOptions);
        } catch (PubSubException.ClientNotSubscribedException e) {
            logger.error("Unexpected Exception thrown: ", e);
            throw new PubSubException.ServiceDownException(e);
        }
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void asyncSubscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach, Callback<Void> callback, Object obj) {
        asyncSubscribe(byteString, byteString2, PubSubProtocol.SubscriptionOptions.newBuilder().setCreateOrAttach(createOrAttach).build(), callback, obj, false);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void asyncSubscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionOptions subscriptionOptions, Callback<Void> callback, Object obj) {
        asyncSubscribe(byteString, byteString2, subscriptionOptions, callback, obj, false);
    }

    protected void asyncSubscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionOptions subscriptionOptions, Callback<Void> callback, Object obj, boolean z) {
        if (isValidSubscriberId(byteString2, z)) {
            asyncSubUnsub(byteString, byteString2, new VoidCallbackAdapter(callback), obj, PubSubProtocol.OperationType.SUBSCRIBE, subscriptionOptions);
        } else {
            callback.operationFailed(obj, new PubSubException.ServiceDownException(new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z)));
        }
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void unsubscribe(ByteString byteString, ByteString byteString2) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        unsubscribe(byteString, byteString2, false);
    }

    protected void unsubscribe(ByteString byteString, ByteString byteString2, boolean z) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!isValidSubscriberId(byteString2, z)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z);
        }
        closeSubscription(byteString, byteString2);
        try {
            subUnsub(byteString, byteString2, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
        } catch (PubSubException.ClientAlreadySubscribedException e) {
            logger.error("Unexpected Exception thrown: ", e);
            throw new PubSubException.ServiceDownException(e);
        }
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void asyncUnsubscribe(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj) {
        doAsyncUnsubscribe(byteString, byteString2, new VoidCallbackAdapter(callback), obj, false);
    }

    protected void asyncUnsubscribe(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj, boolean z) {
        doAsyncUnsubscribe(byteString, byteString2, new VoidCallbackAdapter(callback), obj, z);
    }

    private void doAsyncUnsubscribe(final ByteString byteString, final ByteString byteString2, final Callback<PubSubProtocol.ResponseBody> callback, final Object obj, boolean z) {
        if (isValidSubscriberId(byteString2, z)) {
            doAsyncCloseSubscription(byteString, byteString2, new Callback<PubSubProtocol.ResponseBody>() { // from class: org.apache.hedwig.client.netty.HedwigSubscriber.1
                @Override // org.apache.hedwig.util.Callback
                public void operationFinished(Object obj2, PubSubProtocol.ResponseBody responseBody) {
                    HedwigSubscriber.this.asyncSubUnsub(byteString, byteString2, callback, obj, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
                }

                @Override // org.apache.hedwig.util.Callback
                public void operationFailed(Object obj2, PubSubException pubSubException) {
                    callback.operationFailed(obj, pubSubException);
                }
            }, null);
        } else {
            callback.operationFailed(obj, new PubSubException.ServiceDownException(new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z)));
        }
    }

    private boolean isValidSubscriberId(ByteString byteString, boolean z) {
        if (!z || SubscriptionStateUtils.isHubSubscriber(byteString)) {
            return z || !SubscriptionStateUtils.isHubSubscriber(byteString);
        }
        return false;
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void consume(ByteString byteString, ByteString byteString2, PubSubProtocol.MessageSeqId messageSeqId) throws PubSubException.ClientNotSubscribedException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        logger.debug("Calling consume for {}, messageSeqId: {}.", topicSubscriber, messageSeqId);
        SubscribeResponseHandler subscribeResponseHandler = this.channelManager.getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            throw new PubSubException.ClientNotSubscribedException("Cannot send consume message since client is not subscribed to topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
        }
        subscribeResponseHandler.consume(topicSubscriber, messageSeqId);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public boolean hasSubscription(ByteString byteString, ByteString byteString2) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        SubscribeResponseHandler subscribeResponseHandler = this.channelManager.getSubscribeResponseHandler(topicSubscriber);
        return null != subscribeResponseHandler && subscribeResponseHandler.hasSubscription(topicSubscriber);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public List<ByteString> getSubscriptionList(ByteString byteString) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        return null;
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void startDelivery(ByteString byteString, ByteString byteString2, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        logger.debug("Starting delivery for {}.", topicSubscriber);
        this.channelManager.startDelivery(topicSubscriber, messageHandler);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void startDeliveryWithFilter(ByteString byteString, ByteString byteString2, MessageHandler messageHandler, ClientMessageFilter clientMessageFilter) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        if (null == messageHandler || null == clientMessageFilter) {
            throw new NullPointerException("Null message handler or message filter is       provided.");
        }
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        FilterableMessageHandler filterableMessageHandler = new FilterableMessageHandler(messageHandler, clientMessageFilter);
        logger.debug("Starting delivery with filter for {}.", topicSubscriber);
        this.channelManager.startDelivery(topicSubscriber, filterableMessageHandler);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void stopDelivery(ByteString byteString, ByteString byteString2) throws PubSubException.ClientNotSubscribedException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        logger.debug("Stopping delivery for {}.", topicSubscriber);
        this.channelManager.stopDelivery(topicSubscriber);
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void closeSubscription(ByteString byteString, ByteString byteString2) throws PubSubException.ServiceDownException {
        PubSubData pubSubData = new PubSubData(byteString, null, byteString2, null, null, null, null);
        synchronized (pubSubData) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            doAsyncCloseSubscription(byteString, byteString2, pubSubCallback, null);
            while (!pubSubData.isDone) {
                try {
                    pubSubData.wait();
                } catch (InterruptedException e) {
                    throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
                }
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                throw new PubSubException.ServiceDownException("Exception while trying to close the subscription for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
            }
        }
    }

    @Override // org.apache.hedwig.client.api.Subscriber
    public void asyncCloseSubscription(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj) {
        doAsyncCloseSubscription(byteString, byteString2, new VoidCallbackAdapter(callback), obj);
    }

    private void doAsyncCloseSubscription(ByteString byteString, ByteString byteString2, Callback<PubSubProtocol.ResponseBody> callback, Object obj) {
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        logger.debug("Stopping delivery for {} before closing subscription.", topicSubscriber);
        try {
            this.channelManager.stopDelivery(topicSubscriber);
        } catch (PubSubException.ClientNotSubscribedException e) {
        }
        logger.debug("Closing subscription asynchronously for {}.", topicSubscriber);
        this.channelManager.asyncCloseSubscription(topicSubscriber, callback, obj);
    }
}
