package org.apache.hedwig.client.netty.impl.multiplex;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.CleanupChannelMap;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.class */
public class MultiplexHChannelManager extends AbstractHChannelManager {
    static final Logger logger = LoggerFactory.getLogger(MultiplexHChannelManager.class);
    protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels;
    protected final CleanupChannelMap<TopicSubscriber> sub2Channels;
    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler;
    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;

    public MultiplexHChannelManager(ClientConfiguration clientConfiguration, ChannelFactory channelFactory) {
        super(clientConfiguration, channelFactory);
        this.topicSubscriber2MessageHandler = new ConcurrentHashMap();
        this.subscriptionChannels = new CleanupChannelMap<>();
        this.sub2Channels = new CleanupChannelMap<>();
        this.subscriptionChannelPipelineFactory = new MultiplexSubscriptionChannelPipelineFactory(clientConfiguration, this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    public ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
        return this.subscriptionChannelPipelineFactory;
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
        InetSocketAddress hostFromChannel = NetUtils.getHostFromChannel(channel);
        return storeSubscriptionChannel(hostFromChannel, new HChannelImpl(hostFromChannel, channel, this, getSubscriptionChannelPipelineFactory()));
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress inetSocketAddress) {
        return storeSubscriptionChannel(inetSocketAddress, new HChannelImpl(inetSocketAddress, this, getSubscriptionChannelPipelineFactory()));
    }

    private HChannel storeSubscriptionChannel(InetSocketAddress inetSocketAddress, HChannel hChannel) {
        return this.subscriptionChannels.addChannel(inetSocketAddress, hChannel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    public HChannel getSubscriptionChannel(InetSocketAddress inetSocketAddress) {
        return this.subscriptionChannels.getChannel(inetSocketAddress);
    }

    protected HChannel getSubscriptionChannel(TopicSubscriber topicSubscriber) {
        InetSocketAddress inetSocketAddress = this.topic2Host.get(topicSubscriber.getTopic());
        if (null == inetSocketAddress) {
            return null;
        }
        return getSubscriptionChannel(inetSocketAddress);
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber) {
        InetSocketAddress inetSocketAddress = this.topic2Host.get(topicSubscriber.getTopic());
        if (null == inetSocketAddress) {
            return null;
        }
        HChannel subscriptionChannel = getSubscriptionChannel(inetSocketAddress);
        if (null == subscriptionChannel) {
            subscriptionChannel = createAndStoreSubscriptionChannel(inetSocketAddress);
        }
        return subscriptionChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    public void onSubscriptionChannelDisconnected(InetSocketAddress inetSocketAddress, Channel channel) {
        Channel channel2;
        HChannel channel3 = this.subscriptionChannels.getChannel(inetSocketAddress);
        if (null == channel3 || null == (channel2 = channel3.getChannel()) || !channel2.equals(channel)) {
            return;
        }
        logger.info("Subscription Channel {} disconnected from {}.", VarArgs.va(channel, inetSocketAddress));
        if (this.subscriptionChannels.removeChannel(inetSocketAddress, channel3)) {
            try {
                HChannelImpl.getHChannelHandlerFromChannel(channel).getSubscribeResponseHandler().onChannelDisconnected(inetSocketAddress, channel);
            } catch (NoResponseHandlerException e) {
                logger.warn("No Channel Handler found for channel {} when it disconnected.", channel);
            }
        }
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
        Channel channel;
        HChannel subscriptionChannel = getSubscriptionChannel(topicSubscriber);
        if (null == subscriptionChannel || null == (channel = subscriptionChannel.getChannel())) {
            return null;
        }
        try {
            return HChannelImpl.getHChannelHandlerFromChannel(channel).getSubscribeResponseHandler();
        } catch (NoResponseHandlerException e) {
            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.", channel, topicSubscriber);
            return null;
        }
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        startDelivery(topicSubscriber, messageHandler, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    public void restartDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        startDelivery(topicSubscriber, null, true);
    }

    private void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler, boolean z) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        SubscribeResponseHandler subscribeResponseHandler = getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        MessageHandler messageHandler2 = this.topicSubscriber2MessageHandler.get(topicSubscriber);
        if (z) {
            messageHandler = messageHandler2;
        } else {
            if (null != messageHandler2) {
                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
            }
            if (messageHandler != null && null != this.topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
                throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
            }
        }
        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void stopDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException {
        SubscribeResponseHandler subscribeResponseHandler = getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        this.topicSubscriber2MessageHandler.remove(topicSubscriber);
        subscribeResponseHandler.stopDelivery(topicSubscriber);
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void asyncCloseSubscription(TopicSubscriber topicSubscriber, Callback<PubSubProtocol.ResponseBody> callback, Object obj) {
        SubscribeResponseHandler subscribeResponseHandler = getSubscribeResponseHandler(topicSubscriber);
        if (null != subscribeResponseHandler && subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            subscribeResponseHandler.asyncCloseSubscription(topicSubscriber, callback, obj);
        } else {
            logger.warn("Trying to close a subscription when we don't have a subscription channel cached for {}", topicSubscriber);
            callback.operationFinished(obj, (PubSubProtocol.ResponseBody) null);
        }
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected void checkTimeoutRequestsOnSubscriptionChannels() {
        if (null == this.subscriptionChannels) {
            return;
        }
        Iterator<HChannel> it = this.subscriptionChannels.getChannels().iterator();
        while (it.hasNext()) {
            try {
                HChannelImpl.getHChannelHandlerFromChannel(it.next().getChannel()).checkTimeoutRequests();
            } catch (NoResponseHandlerException e) {
            }
        }
    }

    @Override // org.apache.hedwig.client.netty.impl.AbstractHChannelManager
    protected void closeSubscriptionChannels() {
        this.subscriptionChannels.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Either<Boolean, HChannel> storeSubscriptionChannel(TopicSubscriber topicSubscriber, PubSubData pubSubData, HChannel hChannel) {
        boolean replaceChannel = this.sub2Channels.replaceChannel(topicSubscriber, pubSubData.getOriginalChannelForResubscribe(), hChannel);
        return replaceChannel ? Either.of(Boolean.valueOf(replaceChannel), hChannel) : Either.of(Boolean.valueOf(replaceChannel), null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean removeSubscriptionChannel(TopicSubscriber topicSubscriber, HChannel hChannel) {
        return this.sub2Channels.removeChannel(topicSubscriber, hChannel);
    }
}
