package org.apache.hedwig.client.netty.impl.simple;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.CleanupChannelMap;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.class */
public class SimpleHChannelManager extends AbstractHChannelManager {
    private static Logger logger = LoggerFactory.getLogger(SimpleHChannelManager.class);
    protected final CleanupChannelMap<TopicSubscriber> topicSubscriber2Channel;
    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler;
    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;

    public SimpleHChannelManager(ClientConfiguration clientConfiguration, ChannelFactory channelFactory) {
        super(clientConfiguration, channelFactory);
        this.topicSubscriber2MessageHandler = new ConcurrentHashMap();
        this.topicSubscriber2Channel = new CleanupChannelMap<>();
        this.subscriptionChannelPipelineFactory = new SimpleSubscriptionChannelPipelineFactory(clientConfiguration, this);
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager, org.apache.hedwig.client.netty.HChannelManager
    public void submitOp(final PubSubData pubSubData) {
        if (PubSubProtocol.OperationType.SUBSCRIBE.equals(pubSubData.operationType)) {
            final Callback<PubSubProtocol.ResponseBody> callback = pubSubData.getCallback();
            final AtomicInteger atomicInteger = new AtomicInteger(5);
            pubSubData.setCallback(new Callback<PubSubProtocol.ResponseBody>() { // from class: org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager.1
                @Override // org.apache.hedwig.util.Callback
                public void operationFinished(Object obj, PubSubProtocol.ResponseBody responseBody) {
                    callback.operationFinished(obj, responseBody);
                }

                @Override // org.apache.hedwig.util.Callback
                public void operationFailed(Object obj, PubSubException pubSubException) {
                    if (!(pubSubException instanceof PubSubException.ServiceDownException) || !(pubSubException.getCause() instanceof PubSubException.TopicBusyException) || atomicInteger.decrementAndGet() <= 0) {
                        callback.operationFailed(obj, pubSubException);
                    } else {
                        SimpleHChannelManager.logger.warn("TOPIC_DOWN from server using simple channel scheme.This could be due to the channel disconnection from a close not having been triggered on the server side. Retrying");
                        SimpleHChannelManager.super.submitOp(pubSubData);
                    }
                }
            });
        }
        super.submitOp(pubSubData);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    public ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
        return this.subscriptionChannelPipelineFactory;
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
        return new HChannelImpl(NetUtils.getHostFromChannel(channel), channel, this, getSubscriptionChannelPipelineFactory());
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress inetSocketAddress) {
        return new HChannelImpl(inetSocketAddress, this, getSubscriptionChannelPipelineFactory());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Either<Boolean, HChannel> storeSubscriptionChannel(TopicSubscriber topicSubscriber, PubSubData pubSubData, Channel channel) {
        HChannelImpl hChannelImpl = new HChannelImpl(NetUtils.getHostFromChannel(channel), channel, this, getSubscriptionChannelPipelineFactory());
        boolean replaceChannel = this.topicSubscriber2Channel.replaceChannel(topicSubscriber, pubSubData.getOriginalChannelForResubscribe(), hChannelImpl);
        return replaceChannel ? Either.of(Boolean.valueOf(replaceChannel), hChannelImpl) : Either.of(Boolean.valueOf(replaceChannel), null);
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected HChannel getSubscriptionChannel(InetSocketAddress inetSocketAddress) {
        return null;
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber) {
        HChannel channel = this.topicSubscriber2Channel.getChannel(topicSubscriber);
        if (null != channel) {
            return channel;
        }
        InetSocketAddress inetSocketAddress = this.topic2Host.get(topicSubscriber.getTopic());
        if (null == inetSocketAddress) {
            return null;
        }
        HChannel subscriptionChannel = getSubscriptionChannel(inetSocketAddress);
        if (null == subscriptionChannel) {
            subscriptionChannel = createAndStoreSubscriptionChannel(inetSocketAddress);
        }
        return subscriptionChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    public void onSubscriptionChannelDisconnected(InetSocketAddress inetSocketAddress, Channel channel) {
        logger.info("Subscription Channel {} disconnected from {}.", VarArgs.va(channel, inetSocketAddress));
        try {
            HChannelImpl.getHChannelHandlerFromChannel(channel).getSubscribeResponseHandler().onChannelDisconnected(inetSocketAddress, channel);
        } catch (NoResponseHandlerException e) {
            logger.warn("No Channel Handler found for channel {} when it disconnected.", channel);
        }
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
        Channel channel;
        HChannel channel2 = this.topicSubscriber2Channel.getChannel(topicSubscriber);
        if (null == channel2 || null == (channel = channel2.getChannel())) {
            return null;
        }
        try {
            return HChannelImpl.getHChannelHandlerFromChannel(channel).getSubscribeResponseHandler();
        } catch (NoResponseHandlerException e) {
            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.", channel, topicSubscriber);
            return null;
        }
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        startDelivery(topicSubscriber, messageHandler, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    public void restartDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        startDelivery(topicSubscriber, null, true);
    }

    private void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler, boolean z) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        SubscribeResponseHandler subscribeResponseHandler = getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        MessageHandler messageHandler2 = this.topicSubscriber2MessageHandler.get(topicSubscriber);
        if (z) {
            messageHandler = messageHandler2;
        } else {
            if (null != messageHandler2) {
                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
            }
            if (messageHandler != null && null != this.topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
                throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
            }
        }
        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void stopDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException {
        SubscribeResponseHandler subscribeResponseHandler = getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        this.topicSubscriber2MessageHandler.remove(topicSubscriber);
        subscribeResponseHandler.stopDelivery(topicSubscriber);
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber, final Callback<PubSubProtocol.ResponseBody> callback, final Object obj) {
        HChannel removeChannel = this.topicSubscriber2Channel.removeChannel(topicSubscriber);
        if (null == removeChannel) {
            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for {}", topicSubscriber);
            callback.operationFinished(obj, (PubSubProtocol.ResponseBody) null);
            return;
        }
        Channel channel = removeChannel.getChannel();
        if (null == channel) {
            callback.operationFinished(obj, (PubSubProtocol.ResponseBody) null);
            return;
        }
        try {
            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
        } catch (NoResponseHandlerException e) {
            logger.warn("No Channel Handler found when closing {}'s channel {}.", channel, topicSubscriber);
        }
        channel.close().addListener(new ChannelFutureListener() { // from class: org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager.2
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    callback.operationFinished(obj, (PubSubProtocol.ResponseBody) null);
                } else {
                    SimpleHChannelManager.logger.error("Failed to close the subscription channel for {}", topicSubscriber);
                    callback.operationFailed(obj, new PubSubException.ServiceDownException("Failed to close the subscription channel for " + topicSubscriber));
                }
            }
        });
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected void checkTimeoutRequestsOnSubscriptionChannels() {
        if (null == this.topicSubscriber2Channel) {
            return;
        }
        Iterator<HChannel> it = this.topicSubscriber2Channel.getChannels().iterator();
        while (it.hasNext()) {
            try {
                HChannelImpl.getHChannelHandlerFromChannel(it.next().getChannel()).checkTimeoutRequests();
            } catch (NoResponseHandlerException e) {
            }
        }
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected void closeSubscriptionChannels() {
        this.topicSubscriber2Channel.close();
    }
}
