package org.apache.hedwig.client.netty.impl;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.handlers.AbstractResponseHandler;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage("all")
/* loaded from: input_file:org/apache/hedwig/client/netty/impl/HChannelHandler.class */
public class HChannelHandler extends SimpleChannelHandler {
    private static Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
    private final ConcurrentMap<Long, PubSubData> txn2PubSubData = new ConcurrentHashMap();
    private volatile boolean channelClosedExplicitly = false;
    private final AbstractHChannelManager channelManager;
    private final ClientConfiguration cfg;
    private final Map<PubSubProtocol.OperationType, AbstractResponseHandler> handlers;
    private final SubscribeResponseHandler subHandler;

    public HChannelHandler(ClientConfiguration clientConfiguration, AbstractHChannelManager abstractHChannelManager, Map<PubSubProtocol.OperationType, AbstractResponseHandler> map) {
        this.cfg = clientConfiguration;
        this.channelManager = abstractHChannelManager;
        this.handlers = map;
        this.subHandler = (SubscribeResponseHandler) map.get(PubSubProtocol.OperationType.SUBSCRIBE);
    }

    public SubscribeResponseHandler getSubscribeResponseHandler() {
        return this.subHandler;
    }

    public void removeTxn(long j) {
        this.txn2PubSubData.remove(Long.valueOf(j));
    }

    public void addTxn(long j, PubSubData pubSubData) {
        this.txn2PubSubData.put(Long.valueOf(j), pubSubData);
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        if (!(messageEvent.getMessage() instanceof PubSubProtocol.PubSubResponse)) {
            channelHandlerContext.sendUpstream(messageEvent);
            return;
        }
        PubSubProtocol.PubSubResponse pubSubResponse = (PubSubProtocol.PubSubResponse) messageEvent.getMessage();
        logger.debug("Response received from host: {}, response: {}.", VarArgs.va(NetUtils.getHostFromChannel(channelHandlerContext.getChannel()), pubSubResponse));
        if (pubSubResponse.hasMessage()) {
            if (null == this.subHandler) {
                logger.error("Received message from a non-subscription channel : {}", pubSubResponse);
                return;
            } else {
                this.subHandler.handleSubscribeMessage(pubSubResponse);
                return;
            }
        }
        if (pubSubResponse.hasResponseBody()) {
            PubSubProtocol.ResponseBody responseBody = pubSubResponse.getResponseBody();
            if (responseBody.hasSubscriptionEvent()) {
                if (null == this.subHandler) {
                    logger.error("Received subscription event from a non-subscription channel : {}", pubSubResponse);
                    return;
                }
                PubSubProtocol.SubscriptionEventResponse subscriptionEvent = responseBody.getSubscriptionEvent();
                logger.debug("Received subscription event {} for (topic:{}, subscriber:{}).", VarArgs.va(subscriptionEvent.getEvent(), pubSubResponse.getTopic(), pubSubResponse.getSubscriberId()));
                this.subHandler.handleSubscriptionEvent(pubSubResponse.getTopic(), pubSubResponse.getSubscriberId(), subscriptionEvent.getEvent());
                return;
            }
        }
        PubSubData remove = this.txn2PubSubData.remove(Long.valueOf(pubSubResponse.getTxnId()));
        if (remove == null) {
            logger.error("PubSub Data was not found for PubSubResponse: {}", pubSubResponse);
            return;
        }
        if (!pubSubResponse.getStatusCode().equals(PubSubProtocol.StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
            this.channelManager.storeTopic2HostMapping(remove.topic, NetUtils.getHostFromChannel(channelHandlerContext.getChannel()));
        }
        logger.debug("Handling a {} response: {}, pubSubData: {}, host: {}.", VarArgs.va(remove.operationType, pubSubResponse, remove, channelHandlerContext.getChannel()));
        AbstractResponseHandler abstractResponseHandler = this.handlers.get(remove.operationType);
        if (null != abstractResponseHandler) {
            abstractResponseHandler.handleResponse(pubSubResponse, remove, channelHandlerContext.getChannel());
        } else {
            logger.error("Response received from server is for an unhandled operation {}, txnId: {}.", VarArgs.va(remove.operationType, Long.valueOf(pubSubResponse.getTxnId())));
            remove.getCallback().operationFailed(remove.context, new PubSubException.UnexpectedConditionException("Can't find response handler for operation " + remove.operationType));
        }
    }

    public void checkTimeoutRequests() {
        long currentTimeMillis = System.currentTimeMillis();
        long serverAckResponseTimeout = this.cfg.getServerAckResponseTimeout();
        Iterator<PubSubData> it = this.txn2PubSubData.values().iterator();
        while (it.hasNext()) {
            checkTimeoutRequest(it.next(), currentTimeMillis, serverAckResponseTimeout);
        }
    }

    private void checkTimeoutRequest(PubSubData pubSubData, long j, long j2) {
        if (j > pubSubData.requestWriteTime + j2) {
            logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
            this.txn2PubSubData.remove(Long.valueOf(pubSubData.txnId));
            pubSubData.getCallback().operationFailed(pubSubData.context, new PubSubException.UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
        }
    }

    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        InetSocketAddress hostFromChannel;
        if (this.channelClosedExplicitly || this.channelManager.isClosed() || (hostFromChannel = NetUtils.getHostFromChannel(channelHandlerContext.getChannel())) == null) {
            return;
        }
        logger.info("Channel {} was disconnected to host {}.", VarArgs.va(channelHandlerContext.getChannel(), hostFromChannel));
        if (null == this.subHandler) {
            this.channelManager.onNonSubscriptionChannelDisconnected(hostFromChannel, channelHandlerContext.getChannel());
        } else {
            this.channelManager.onSubscriptionChannelDisconnected(hostFromChannel, channelHandlerContext.getChannel());
        }
        for (PubSubData pubSubData : this.txn2PubSubData.values()) {
            logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: {}", pubSubData);
            pubSubData.getCallback().operationFailed(pubSubData.context, new PubSubException.UncertainStateException("Server ack response never received before server connection disconnected!"));
        }
        this.txn2PubSubData.clear();
    }

    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        if (!this.cfg.isSSLEnabled() || this.channelClosedExplicitly || this.channelManager.isClosed()) {
            return;
        }
        logger.debug("Initiating the SSL handshake");
        channelHandlerContext.getPipeline().get(SslHandler.class).handshake(channelStateEvent.getChannel());
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        logger.error("Exception caught on client channel", exceptionEvent.getCause());
        exceptionEvent.getChannel().close();
    }

    public void closeExplicitly() {
        this.channelClosedExplicitly = true;
    }
}
