/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.brpc.server.handler;

import com.baidu.brpc.ChannelInfo;
import com.baidu.brpc.buffer.DynamicCompositeByteBuf;
import com.baidu.brpc.exceptions.BadSchemaException;
import com.baidu.brpc.exceptions.NotEnoughDataException;
import com.baidu.brpc.exceptions.RpcException;
import com.baidu.brpc.exceptions.TooBigDataException;
import com.baidu.brpc.protocol.Protocol;
import com.baidu.brpc.protocol.ProtocolManager;
import com.baidu.brpc.server.ChannelManager;
import com.baidu.brpc.server.RpcServer;
import com.baidu.brpc.server.handler.DecodeWorkTask;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.unix.Errors;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class RpcServerHandler
extends SimpleChannelInboundHandler<Object> {
    private static final Logger log = LoggerFactory.getLogger(RpcServerHandler.class);
    private RpcServer rpcServer;

    public RpcServerHandler(RpcServer rpcServer) {
        this.rpcServer = rpcServer;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ChannelInfo channelInfo = ChannelInfo.getOrCreateServerChannelInfo(ctx.channel());
        channelInfo.setProtocol(this.rpcServer.getProtocol());
    }

    public void channelRead0(ChannelHandlerContext ctx, Object in) throws Exception {
        ChannelInfo channelInfo = ChannelInfo.getServerChannelInfo(ctx.channel());
        ByteBuf msg = (ByteBuf)in;
        int len = msg.readableBytes();
        if (len > 0) {
            channelInfo.getRecvBuf().addBuffer(msg.retain());
            Runnable[] tasks = new DecodeWorkTask[64];
            int i = 0;
            while (channelInfo.getRecvBuf().readableBytes() > 0) {
                try {
                    Object packet = this.decodeHeader(ctx, channelInfo, channelInfo.getRecvBuf());
                    DecodeWorkTask task = new DecodeWorkTask(this.rpcServer, packet, channelInfo.getProtocol(), ctx);
                    tasks[i++] = task;
                    if (i != 64) continue;
                    this.rpcServer.getThreadPool().submit(tasks, 0, i);
                    i = 0;
                }
                catch (NotEnoughDataException ex1) {
                    break;
                }
                catch (TooBigDataException ex2) {
                    throw new RpcException(5, (Throwable)ex2);
                }
                catch (BadSchemaException ex3) {
                    throw new RpcException(5, (Throwable)ex3);
                }
            }
            if (i > 0) {
                this.rpcServer.getThreadPool().submit(tasks, 0, i);
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (ctx.channel().isActive() && !(cause instanceof Errors.NativeIoException) && !(cause instanceof IOException)) {
            log.info("service exception, ex={}", (Object)cause.getMessage());
        }
        log.debug("meet exception, may be connection is closed, msg={}", (Object)cause.getMessage());
        log.debug("remove from channel map");
        ChannelManager.getInstance().removeChannel(ctx.channel());
        ctx.close();
    }

    private Object decodeHeader(ChannelHandlerContext ctx, ChannelInfo channelInfo, DynamicCompositeByteBuf compositeByteBuf) throws NotEnoughDataException, TooBigDataException, BadSchemaException {
        Protocol protocol = channelInfo.getProtocol();
        if (protocol != null) {
            return protocol.decode(ctx, compositeByteBuf, true);
        }
        ProtocolManager protocolManager = ProtocolManager.getInstance();
        List<Protocol> protocols = protocolManager.getCoexistenceProtocols();
        int protocolSize = protocolManager.getCoexistenceProtocolSize();
        for (int i = 0; i < protocolSize; ++i) {
            Protocol protocol1 = protocols.get(i);
            try {
                Object packet = protocol1.decode(ctx, compositeByteBuf, true);
                channelInfo.setProtocol(protocol1);
                return packet;
            }
            catch (BadSchemaException ex3) {
                continue;
            }
        }
        throw new BadSchemaException("bad schema");
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.debug("channel is in active, remove from channel map");
        ChannelManager.getInstance().removeChannel(ctx.channel());
        ctx.fireChannelInactive();
    }
}

