package com.dianping.cat.network;

import com.dianping.cat.component.ComponentContext;
import com.dianping.cat.component.lifecycle.Initializable;
import com.dianping.cat.component.lifecycle.LogEnabled;
import com.dianping.cat.component.lifecycle.Logger;
import com.dianping.cat.message.internal.ByteBufQueue;
import com.dianping.cat.netty.buffer.ByteBuf;
import com.dianping.cat.netty.channel.Channel;
import com.dianping.cat.netty.channel.ChannelHandler;
import com.dianping.cat.netty.channel.ChannelHandlerContext;
import com.dianping.cat.netty.channel.ChannelInboundHandlerAdapter;
import com.dianping.cat.support.Threads;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@ChannelHandler.Sharable
/* loaded from: input_file:com/dianping/cat/network/MessageTransporter.class */
public class MessageTransporter extends ChannelInboundHandlerAdapter implements Initializable, LogEnabled, Threads.Task {
    private ByteBufQueue m_queue;
    private ByteBuf m_buf;
    private Logger m_logger;
    private List<Channel> m_channels = new CopyOnWriteArrayList();
    private AtomicBoolean m_enabled = new AtomicBoolean(true);
    private CountDownLatch m_latch = new CountDownLatch(1);

    @Override // com.dianping.cat.netty.channel.ChannelInboundHandlerAdapter, com.dianping.cat.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        this.m_channels.add(channel);
        this.m_logger.info("Connected to CAT server %s, %s", channel.remoteAddress(), channel);
        super.channelActive(channelHandlerContext);
    }

    @Override // com.dianping.cat.netty.channel.ChannelInboundHandlerAdapter, com.dianping.cat.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        this.m_channels.remove(channel);
        this.m_logger.info("Disconnected from CAT server %s, %s", channel.remoteAddress(), channel);
        super.channelInactive(channelHandlerContext);
    }

    @Override // com.dianping.cat.component.lifecycle.LogEnabled
    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    public List<Channel> getActiveChannels() {
        return this.m_channels;
    }

    @Override // com.dianping.cat.support.Threads.Task
    public String getName() {
        return getClass().getSimpleName();
    }

    @Override // com.dianping.cat.component.lifecycle.Initializable
    public void initialize(ComponentContext componentContext) {
        this.m_queue = (ByteBufQueue) componentContext.lookup(ByteBufQueue.class);
    }

    private ByteBuf next() throws InterruptedException {
        if (this.m_buf == null) {
            this.m_buf = this.m_queue.poll();
        }
        return this.m_buf;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.m_enabled.get()) {
            try {
                ByteBuf next = next();
                if (next == null || !write(next)) {
                    TimeUnit.MILLISECONDS.sleep(5L);
                } else {
                    this.m_buf = null;
                }
            } catch (InterruptedException e) {
                return;
            } finally {
                this.m_latch.countDown();
            }
        }
        if (!this.m_enabled.get()) {
            ByteBuf next2 = next();
            while (next2 != null) {
                if (!write(next2)) {
                    break;
                } else {
                    next2 = next();
                }
            }
        }
    }

    @Override // com.dianping.cat.support.Threads.Task
    public void shutdown() {
        this.m_enabled.set(false);
        try {
            this.m_latch.await();
        } catch (InterruptedException e) {
        }
    }

    private boolean write(ByteBuf byteBuf) {
        if (this.m_channels.isEmpty()) {
            return false;
        }
        Channel channel = this.m_channels.get(0);
        if (!channel.isActive() || !channel.isWritable()) {
            return false;
        }
        channel.writeAndFlush(byteBuf);
        return true;
    }
}
