package org.apache.hedwig.client.netty.impl;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/impl/HChannelImpl.class */
public class HChannelImpl implements HChannel {
    private static Logger logger = LoggerFactory.getLogger(HChannelImpl.class);
    InetSocketAddress host;
    final AbstractHChannelManager channelManager;
    final ClientChannelPipelineFactory pipelineFactory;
    volatile Channel channel;
    volatile State state;
    volatile boolean closed;
    Queue<PubSubData> pendingOps;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/client/netty/impl/HChannelImpl$State.class */
    public enum State {
        DISCONNECTED,
        CONNECTING,
        CONNECTED
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HChannelImpl(InetSocketAddress inetSocketAddress, AbstractHChannelManager abstractHChannelManager) {
        this(inetSocketAddress, abstractHChannelManager, null);
    }

    public HChannelImpl(InetSocketAddress inetSocketAddress, AbstractHChannelManager abstractHChannelManager, ClientChannelPipelineFactory clientChannelPipelineFactory) {
        this(inetSocketAddress, null, abstractHChannelManager, clientChannelPipelineFactory);
        this.state = State.DISCONNECTED;
    }

    public HChannelImpl(InetSocketAddress inetSocketAddress, Channel channel, AbstractHChannelManager abstractHChannelManager, ClientChannelPipelineFactory clientChannelPipelineFactory) {
        this.closed = false;
        this.pendingOps = new ArrayDeque();
        this.host = inetSocketAddress;
        this.channel = channel;
        this.channelManager = abstractHChannelManager;
        this.pipelineFactory = clientChannelPipelineFactory;
        this.state = State.CONNECTED;
    }

    @Override // org.apache.hedwig.client.netty.HChannel
    public void submitOp(PubSubData pubSubData) {
        boolean z = false;
        if (null == this.channel || State.CONNECTED != this.state) {
            synchronized (this) {
                if (null == this.channel || State.CONNECTED != this.state) {
                    this.pendingOps.add(pubSubData);
                } else {
                    z = true;
                }
            }
            if (!z) {
                connect();
            }
        } else {
            z = true;
        }
        if (z) {
            executeOpAfterConnected(pubSubData);
        }
    }

    private void executeOpAfterConnected(PubSubData pubSubData) {
        writePubSubRequest(pubSubData, NetUtils.buildPubSubRequest(this.channelManager.nextTxnId(), pubSubData).build());
    }

    @Override // org.apache.hedwig.client.netty.HChannel
    public Channel getChannel() {
        return this.channel;
    }

    private void writePubSubRequest(PubSubData pubSubData, PubSubProtocol.PubSubRequest pubSubRequest) {
        if (this.closed || null == this.channel || State.CONNECTED != this.state) {
            retryOrFailOp(pubSubData);
            return;
        }
        try {
            getHChannelHandlerFromChannel(this.channel).addTxn(pubSubData.txnId, pubSubData);
            logger.debug("Writing a {} request to host: {} for pubSubData: {}.", VarArgs.va(pubSubData.operationType, this.host, pubSubData));
            this.channel.write(pubSubRequest).addListener(new WriteCallback(pubSubData, this.channelManager));
        } catch (NoResponseHandlerException e) {
            logger.warn("No Channel Handler found for channel {} when writing request. It might already disconnect.", this.channel);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void retryOrFailOp(PubSubData pubSubData) {
        ByteString copyFromUtf8 = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(this.host));
        if (pubSubData.connectFailedServers != null && pubSubData.connectFailedServers.contains(copyFromUtf8)) {
            logger.error("Error connecting to host {} more than once so fail the request: {}", VarArgs.va(this.host, pubSubData));
            pubSubData.getCallback().operationFailed(pubSubData.context, new PubSubException.CouldNotConnectException("Could not connect to host: " + this.host));
            return;
        }
        logger.error("Retry to connect to default hub server again for pubSubData: {}", pubSubData);
        if (pubSubData.connectFailedServers == null) {
            pubSubData.connectFailedServers = new LinkedList();
        }
        pubSubData.connectFailedServers.add(copyFromUtf8);
        this.channelManager.submitOpToDefaultServer(pubSubData);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onChannelConnected(ChannelFuture channelFuture) {
        synchronized (this) {
            if (this.closed) {
                channelFuture.getChannel().close();
                return;
            }
            this.state = State.CONNECTED;
            this.channel = channelFuture.getChannel();
            this.host = NetUtils.getHostFromChannel(this.channel);
            Queue<PubSubData> queue = this.pendingOps;
            this.pendingOps = new ArrayDeque();
            Iterator<PubSubData> it = queue.iterator();
            while (it.hasNext()) {
                executeOpAfterConnected(it.next());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onChannelConnectFailure() {
        Queue<PubSubData> queue;
        synchronized (this) {
            this.state = State.DISCONNECTED;
            this.channel = null;
            queue = this.pendingOps;
            this.pendingOps = new ArrayDeque();
        }
        Iterator<PubSubData> it = queue.iterator();
        while (it.hasNext()) {
            retryOrFailOp(it.next());
        }
    }

    private void connect() {
        synchronized (this) {
            if (State.CONNECTING == this.state || State.CONNECTED == this.state) {
                return;
            }
            this.state = State.CONNECTING;
            connect(this.host, this.pipelineFactory).addListener(new ChannelFutureListener() { // from class: org.apache.hedwig.client.netty.impl.HChannelImpl.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (HChannelImpl.this.closed) {
                        channelFuture.getChannel().close();
                        return;
                    }
                    if (channelFuture.isSuccess()) {
                        HChannelImpl.logger.debug("Connected to server {}.", HChannelImpl.this.host);
                        HChannelImpl.this.onChannelConnected(channelFuture);
                    } else {
                        HChannelImpl.logger.error("Error connecting to host {}.", HChannelImpl.this.host);
                        channelFuture.getChannel().close();
                        HChannelImpl.this.onChannelConnectFailure();
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelFuture connect(InetSocketAddress inetSocketAddress, ClientChannelPipelineFactory clientChannelPipelineFactory) {
        logger.debug("Connecting to host {} ...", inetSocketAddress);
        ClientBootstrap clientBootstrap = new ClientBootstrap(this.channelManager.getChannelFactory());
        clientBootstrap.setPipelineFactory(clientChannelPipelineFactory);
        clientBootstrap.setOption("tcpNoDelay", true);
        clientBootstrap.setOption("keepAlive", true);
        return clientBootstrap.connect(inetSocketAddress);
    }

    @Override // org.apache.hedwig.client.netty.HChannel
    public void close(boolean z) {
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (null == this.channel) {
                return;
            }
            try {
                getHChannelHandlerFromChannel(this.channel).closeExplicitly();
            } catch (NoResponseHandlerException e) {
                logger.warn("No channel handler found for channel {} when closing it.", this.channel);
            }
            if (z) {
                this.channel.close().awaitUninterruptibly();
            } else {
                this.channel.close();
            }
            this.channel = null;
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("[HChannel: host - ").append(this.host).append(", channel - ").append(this.channel).append(", pending reqs - ").append(this.pendingOps.size()).append(", closed - ").append(this.closed).append("]");
        return sb.toString();
    }

    @Override // org.apache.hedwig.client.netty.HChannel
    public void close() {
        close(false);
    }

    public static HChannelHandler getHChannelHandlerFromChannel(Channel channel) throws NoResponseHandlerException {
        if (null == channel) {
            throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
        }
        HChannelHandler last = channel.getPipeline().getLast();
        if (null == last) {
            throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
        }
        return last;
    }
}
