package org.apache.hedwig.client.netty.impl;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.handlers.MessageConsumeCallback;
import org.apache.hedwig.client.netty.CleanupChannelMap;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.SubscriptionEventEmitter;
import org.apache.hedwig.client.ssl.SslClientContextFactory;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/impl/AbstractHChannelManager.class */
public abstract class AbstractHChannelManager implements HChannelManager {
    private static Logger logger = LoggerFactory.getLogger(AbstractHChannelManager.class);
    private static final Set<ByteString> EMPTY_TOPIC_SET = new HashSet();
    private final ClientConfiguration cfg;
    protected final ChannelFactory socketFactory;
    private final ClientChannelPipelineFactory nonSubscriptionChannelPipelineFactory;
    private SslClientContextFactory sslFactory;
    private final HChannel defaultServerChannel;
    private final MessageConsumeCallback consumeCb;
    private final SubscriptionEventEmitter eventEmitter;
    protected boolean closed = false;
    protected final ReentrantReadWriteLock closedLock = new ReentrantReadWriteLock();
    protected final AtomicLong globalCounter = new AtomicLong();
    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap();
    protected final ConcurrentMap<InetSocketAddress, Set<ByteString>> host2Topics = new ConcurrentHashMap();
    protected final CleanupChannelMap<InetSocketAddress> host2NonSubscriptionChannels = new CleanupChannelMap<>();
    private final Timer clientTimer = new Timer(true);

    /* loaded from: input_file:org/apache/hedwig/client/netty/impl/AbstractHChannelManager$PubSubRequestTimeoutTask.class */
    class PubSubRequestTimeoutTask extends TimerTask {
        PubSubRequestTimeoutTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            if (AbstractHChannelManager.this.isClosed()) {
                return;
            }
            AbstractHChannelManager.logger.debug("Running the PubSubRequest Timeout Task");
            Iterator<HChannel> it = AbstractHChannelManager.this.host2NonSubscriptionChannels.getChannels().iterator();
            while (it.hasNext()) {
                try {
                    HChannelImpl.getHChannelHandlerFromChannel(it.next().getChannel()).checkTimeoutRequests();
                } catch (NoResponseHandlerException e) {
                }
            }
            AbstractHChannelManager.this.checkTimeoutRequestsOnSubscriptionChannels();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractHChannelManager(ClientConfiguration clientConfiguration, ChannelFactory channelFactory) {
        this.sslFactory = null;
        this.cfg = clientConfiguration;
        this.socketFactory = channelFactory;
        this.nonSubscriptionChannelPipelineFactory = new NonSubscriptionChannelPipelineFactory(clientConfiguration, this);
        this.defaultServerChannel = new DefaultServerChannel(clientConfiguration.getDefaultServerHost(), this);
        if (clientConfiguration.isSSLEnabled()) {
            this.sslFactory = new SslClientContextFactory(clientConfiguration);
        }
        this.consumeCb = new MessageConsumeCallback(clientConfiguration, this);
        this.eventEmitter = new SubscriptionEventEmitter();
        this.clientTimer.schedule(new PubSubRequestTimeoutTask(), 0L, clientConfiguration.getTimeoutThreadRunInterval());
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public SubscriptionEventEmitter getSubscriptionEventEmitter() {
        return this.eventEmitter;
    }

    public MessageConsumeCallback getConsumeCallback() {
        return this.consumeCb;
    }

    public SslClientContextFactory getSslFactory() {
        return this.sslFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelFactory getChannelFactory() {
        return this.socketFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientChannelPipelineFactory getNonSubscriptionChannelPipelineFactory() {
        return this.nonSubscriptionChannelPipelineFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory();

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void schedule(TimerTask timerTask, long j) {
        this.closedLock.readLock().lock();
        try {
            if (this.closed) {
                logger.warn("Task {} is not scheduled due to the channel manager is closed.", timerTask);
                this.closedLock.readLock().unlock();
            } else {
                this.clientTimer.schedule(timerTask, j);
                this.closedLock.readLock().unlock();
            }
        } catch (Throwable th) {
            this.closedLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void submitOpAfterDelay(final PubSubData pubSubData, final long j) {
        this.closedLock.readLock().lock();
        try {
            if (this.closed) {
                pubSubData.getCallback().operationFailed(pubSubData.context, new PubSubException.ServiceDownException("Client has been closed."));
                this.closedLock.readLock().unlock();
            } else {
                this.clientTimer.schedule(new TimerTask() { // from class: org.apache.hedwig.client.netty.impl.AbstractHChannelManager.1
                    @Override // java.util.TimerTask, java.lang.Runnable
                    public void run() {
                        AbstractHChannelManager.logger.debug("Submit request {} in {} ms later.", VarArgs.va(pubSubData, Long.valueOf(j)));
                        AbstractHChannelManager.this.submitOp(pubSubData);
                    }
                }, j);
                this.closedLock.readLock().unlock();
            }
        } catch (Throwable th) {
            this.closedLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void submitOp(PubSubData pubSubData) {
        HChannel nonSubscriptionChannelByTopic = (PubSubProtocol.OperationType.PUBLISH.equals(pubSubData.operationType) || PubSubProtocol.OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) ? getNonSubscriptionChannelByTopic(pubSubData.topic) : getSubscriptionChannelByTopicSubscriber(new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId));
        if (null == nonSubscriptionChannelByTopic) {
            nonSubscriptionChannelByTopic = this.defaultServerChannel;
        }
        nonSubscriptionChannelByTopic.submitOp(pubSubData);
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void redirectToHost(PubSubData pubSubData, InetSocketAddress inetSocketAddress) {
        HChannel nonSubscriptionChannel;
        logger.debug("Submit operation {} to host {}.", VarArgs.va(pubSubData, inetSocketAddress));
        if (PubSubProtocol.OperationType.PUBLISH.equals(pubSubData.operationType) || PubSubProtocol.OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) {
            nonSubscriptionChannel = getNonSubscriptionChannel(inetSocketAddress);
            if (null == nonSubscriptionChannel) {
                nonSubscriptionChannel = createAndStoreNonSubscriptionChannel(inetSocketAddress);
            }
        } else {
            nonSubscriptionChannel = getSubscriptionChannel(inetSocketAddress);
            if (null == nonSubscriptionChannel) {
                nonSubscriptionChannel = createAndStoreSubscriptionChannel(inetSocketAddress);
            }
        }
        if (null == nonSubscriptionChannel) {
            nonSubscriptionChannel = this.defaultServerChannel;
        }
        nonSubscriptionChannel.submitOp(pubSubData);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void submitOpThruChannel(PubSubData pubSubData, Channel channel) {
        logger.debug("Submit operation {} to thru channel {}.", VarArgs.va(pubSubData, channel));
        ((PubSubProtocol.OperationType.PUBLISH.equals(pubSubData.operationType) || PubSubProtocol.OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) ? createAndStoreNonSubscriptionChannel(channel) : createAndStoreSubscriptionChannel(channel)).submitOp(pubSubData);
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void submitOpToDefaultServer(PubSubData pubSubData) {
        logger.debug("Submit operation {} to default server {}.", VarArgs.va(pubSubData, this.defaultServerChannel));
        this.defaultServerChannel.submitOp(pubSubData);
    }

    private HChannel createAndStoreNonSubscriptionChannel(Channel channel) {
        InetSocketAddress hostFromChannel = NetUtils.getHostFromChannel(channel);
        return storeNonSubscriptionChannel(hostFromChannel, new HChannelImpl(hostFromChannel, channel, this, getNonSubscriptionChannelPipelineFactory()));
    }

    private HChannel createAndStoreNonSubscriptionChannel(InetSocketAddress inetSocketAddress) {
        return storeNonSubscriptionChannel(inetSocketAddress, new HChannelImpl(inetSocketAddress, this, getNonSubscriptionChannelPipelineFactory()));
    }

    private HChannel storeNonSubscriptionChannel(InetSocketAddress inetSocketAddress, HChannel hChannel) {
        return this.host2NonSubscriptionChannels.addChannel(inetSocketAddress, hChannel);
    }

    private HChannel getNonSubscriptionChannel(InetSocketAddress inetSocketAddress) {
        return this.host2NonSubscriptionChannels.getChannel(inetSocketAddress);
    }

    private HChannel getNonSubscriptionChannelByTopic(ByteString byteString) {
        InetSocketAddress inetSocketAddress = this.topic2Host.get(byteString);
        if (null == inetSocketAddress) {
            return null;
        }
        HChannel nonSubscriptionChannel = getNonSubscriptionChannel(inetSocketAddress);
        if (null == nonSubscriptionChannel) {
            nonSubscriptionChannel = createAndStoreNonSubscriptionChannel(inetSocketAddress);
        }
        return nonSubscriptionChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onNonSubscriptionChannelDisconnected(InetSocketAddress inetSocketAddress, Channel channel) {
        Channel channel2;
        HChannel channel3 = this.host2NonSubscriptionChannels.getChannel(inetSocketAddress);
        if (null == channel3 || null == (channel2 = channel3.getChannel()) || !channel2.equals(channel)) {
            return;
        }
        logger.info("NonSubscription Channel {} to {} disconnected.", VarArgs.va(channel, inetSocketAddress));
        if (this.host2NonSubscriptionChannels.removeChannel(inetSocketAddress, channel3)) {
            clearAllTopicsForHost(inetSocketAddress);
        }
    }

    protected abstract HChannel createAndStoreSubscriptionChannel(Channel channel);

    protected abstract HChannel createAndStoreSubscriptionChannel(InetSocketAddress inetSocketAddress);

    protected abstract HChannel getSubscriptionChannel(InetSocketAddress inetSocketAddress);

    protected abstract HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void onSubscriptionChannelDisconnected(InetSocketAddress inetSocketAddress, Channel channel);

    private void sendConsumeRequest(final TopicSubscriber topicSubscriber, final PubSubProtocol.MessageSeqId messageSeqId, final Channel channel) {
        PubSubProtocol.PubSubRequest.Builder buildConsumeRequest = NetUtils.buildConsumeRequest(nextTxnId(), topicSubscriber, messageSeqId);
        logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}", VarArgs.va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
        channel.write(buildConsumeRequest.build()).addListener(new ChannelFutureListener() { // from class: org.apache.hedwig.client.netty.impl.AbstractHChannelManager.2
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                AbstractHChannelManager.logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}", VarArgs.va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void storeTopic2HostMapping(ByteString byteString, InetSocketAddress inetSocketAddress) {
        InetSocketAddress putIfAbsent = this.topic2Host.putIfAbsent(byteString, inetSocketAddress);
        if (null == putIfAbsent || !putIfAbsent.equals(inetSocketAddress)) {
            if (null == putIfAbsent) {
                logger.debug("Storing info for topic: {}, host: {}.", VarArgs.va(byteString.toStringUtf8(), inetSocketAddress));
            } else if (!this.topic2Host.replace(byteString, putIfAbsent, inetSocketAddress)) {
                logger.warn("Ownership of topic: {} has been changed from {} to {} when storeing host: {}", VarArgs.va(byteString.toStringUtf8(), putIfAbsent, this.topic2Host.get(byteString), inetSocketAddress));
                return;
            } else {
                logger.debug("Storing info for topic: {}, old host: {}, new host: {}.", VarArgs.va(byteString.toStringUtf8(), putIfAbsent, inetSocketAddress));
                clearHostForTopic(byteString, putIfAbsent);
            }
            Set<ByteString> set = this.host2Topics.get(inetSocketAddress);
            if (null == set) {
                HashSet hashSet = new HashSet();
                set = this.host2Topics.putIfAbsent(inetSocketAddress, hashSet);
                if (null == set) {
                    set = hashSet;
                }
            }
            synchronized (set) {
                if (inetSocketAddress.equals(this.topic2Host.get(byteString))) {
                    set.add(byteString);
                }
            }
        }
    }

    protected void clearAllTopicsForHost(InetSocketAddress inetSocketAddress) {
        logger.debug("Clearing all topics for host: {}", inetSocketAddress);
        Set<ByteString> set = this.host2Topics.get(inetSocketAddress);
        if (null != set) {
            synchronized (set) {
                for (ByteString byteString : set) {
                    logger.debug("Removing mapping for topic: {} from host: {}.", VarArgs.va(byteString.toStringUtf8(), inetSocketAddress));
                    this.topic2Host.remove(byteString, inetSocketAddress);
                }
            }
            this.host2Topics.remove(inetSocketAddress, set);
        }
    }

    public void clearHostForTopic(ByteString byteString, InetSocketAddress inetSocketAddress) {
        boolean remove;
        logger.debug("Clearing topic: {} from host: {}.", VarArgs.va(byteString.toStringUtf8(), inetSocketAddress));
        if (this.topic2Host.remove(byteString, inetSocketAddress)) {
            logger.debug("Removed topic to host mapping for topic: {} and host: {}.", VarArgs.va(byteString.toStringUtf8(), inetSocketAddress));
        }
        Set<ByteString> set = this.host2Topics.get(inetSocketAddress);
        if (null != set) {
            synchronized (set) {
                remove = set.remove(byteString);
            }
            if (remove) {
                logger.debug("Removed topic: {} from host: {}.", byteString.toStringUtf8(), inetSocketAddress);
                if (set.isEmpty()) {
                    this.host2Topics.remove(inetSocketAddress, EMPTY_TOPIC_SET);
                }
            }
        }
    }

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public long nextTxnId() {
        return this.globalCounter.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void restartDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException;

    protected abstract void checkTimeoutRequestsOnSubscriptionChannels();

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public boolean isClosed() {
        this.closedLock.readLock().lock();
        try {
            boolean z = this.closed;
            this.closedLock.readLock().unlock();
            return z;
        } catch (Throwable th) {
            this.closedLock.readLock().unlock();
            throw th;
        }
    }

    protected abstract void closeSubscriptionChannels();

    @Override // org.apache.hedwig.client.netty.HChannelManager
    public void close() {
        logger.info("Shutting down the channels manager.");
        this.closedLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.closedLock.writeLock().unlock();
            this.clientTimer.cancel();
            this.host2NonSubscriptionChannels.close();
            closeSubscriptionChannels();
            this.topic2Host.clear();
            this.host2Topics.clear();
        } finally {
            this.closedLock.writeLock().unlock();
        }
    }
}
