/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.clogproxy.client.connection;

import com.google.protobuf.InvalidProtocolBufferException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.connection.ClientStream;
import com.oceanbase.clogproxy.client.connection.ConnectionFactory;
import com.oceanbase.clogproxy.client.connection.ConnectionParams;
import com.oceanbase.clogproxy.client.connection.StreamContext;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.concurrent.BlockingQueue;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import org.apache.commons.lang3.Conversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
    private static final byte[] MAGIC_STRING = new byte[]{120, 105, 53, 51, 103, 93, 113};
    private static final String CLIENT_IP = NetworkUtil.getLocalIp();
    private static final int HEAD_LENGTH = 7;
    private ClientStream stream;
    private ConnectionParams params;
    private BlockingQueue<StreamContext.TransferPacket> recordQueue;
    private HandshakeStateV1 state = HandshakeStateV1.PB_HEAD;
    private final ByteToMessageDecoder.Cumulator cumulator = ByteToMessageDecoder.MERGE_CUMULATOR;
    ByteBuf buffer;
    private boolean poolFlag = true;
    private boolean first;
    private int numReads = 0;
    private boolean dataNotEnough = false;
    private int dataLength = 0;
    LZ4Factory factory = LZ4Factory.fastestInstance();
    LZ4FastDecompressor fastDecompressor = this.factory.fastDecompressor();

    protected void resetState() {
        this.state = HandshakeStateV1.PB_HEAD;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            this.dataNotEnough = false;
            ByteBuf data = (ByteBuf)msg;
            boolean bl = this.first = this.buffer == null;
            this.buffer = this.first ? data : this.cumulator.cumulate(ctx.alloc(), this.buffer, data);
        } else {
            if (msg instanceof IdleStateEvent) {
                if (this.stream != null) {
                    this.stream.triggerReconnect();
                }
                return;
            }
            return;
        }
        while (this.poolFlag && this.buffer.isReadable() && !this.dataNotEnough) {
            switch (this.state) {
                case PB_HEAD: {
                    this.handleHeader();
                    break;
                }
                case CLIENT_HANDSHAKE_RESPONSE: {
                    this.handleHandshakeResponse();
                    break;
                }
                case ERROR_RESPONSE: {
                    this.handleErrorResponse();
                    break;
                }
                case STATUS: {
                    this.handleServerStatus();
                    break;
                }
                case RECORD: {
                    this.handleRecord();
                }
            }
        }
        if (this.buffer != null && !this.buffer.isReadable()) {
            this.numReads = 0;
            this.buffer.release();
            this.buffer = null;
        } else if (++this.numReads >= ClientConf.NETTY_DISCARD_AFTER_READS) {
            this.numReads = 0;
            this.discardSomeReadBytes();
        }
    }

    private void handleHeader() {
        if (this.buffer.readableBytes() >= 7) {
            short version = this.buffer.readShort();
            byte type = this.buffer.readByte();
            this.dataLength = this.buffer.readInt();
            this.checkHeader(version, type, this.dataLength);
            HeaderType headerType = HeaderType.codeOf((int)type);
            if (headerType == HeaderType.HANDSHAKE_RESPONSE_CLIENT) {
                this.state = HandshakeStateV1.CLIENT_HANDSHAKE_RESPONSE;
            } else if (headerType == HeaderType.ERROR_RESPONSE) {
                this.state = HandshakeStateV1.ERROR_RESPONSE;
            } else if (headerType == HeaderType.DATA_CLIENT) {
                this.state = HandshakeStateV1.RECORD;
            } else if (headerType == HeaderType.STATUS) {
                this.state = HandshakeStateV1.STATUS;
            }
        } else {
            this.dataNotEnough = true;
        }
    }

    private void handleHandshakeResponse() throws InvalidProtocolBufferException {
        if (this.buffer.readableBytes() >= this.dataLength) {
            byte[] bytes = new byte[this.dataLength];
            this.buffer.readBytes(bytes);
            LogProxyProto.ClientHandshakeResponse response = LogProxyProto.ClientHandshakeResponse.parseFrom((byte[])bytes);
            logger.info("Connected to LogProxyServer, ip:{}, version:{}", (Object)response.getIp(), (Object)response.getVersion());
            this.state = HandshakeStateV1.PB_HEAD;
        } else {
            this.dataNotEnough = true;
        }
    }

    private void handleErrorResponse() throws InvalidProtocolBufferException {
        if (this.buffer.readableBytes() >= this.dataLength) {
            byte[] bytes = new byte[this.dataLength];
            this.buffer.readBytes(bytes);
            LogProxyProto.ErrorResponse response = LogProxyProto.ErrorResponse.parseFrom((byte[])bytes);
            logger.error("LogProxy refused handshake request: {}", (Object)response.toString());
            throw new LogProxyClientException(ErrorCode.NO_AUTH, "LogProxy refused handshake request: " + response.toString());
        }
        this.dataNotEnough = true;
    }

    private void handleServerStatus() throws InvalidProtocolBufferException {
        if (this.buffer.readableBytes() >= this.dataLength) {
            byte[] bytes = new byte[this.dataLength];
            this.buffer.readBytes(bytes);
            LogProxyProto.RuntimeStatus response = LogProxyProto.RuntimeStatus.parseFrom((byte[])bytes);
            logger.debug("server status: {}", (Object)response.toString());
            this.state = HandshakeStateV1.PB_HEAD;
        } else {
            this.dataNotEnough = true;
        }
    }

    private void handleRecord() {
        if (this.buffer.readableBytes() >= this.dataLength) {
            this.parseDataNew();
            this.state = HandshakeStateV1.PB_HEAD;
        } else {
            this.dataNotEnough = true;
        }
    }

    private void checkHeader(int version, int type, int length) {
        if (ProtocolVersion.codeOf((int)version) == null) {
            logger.error("unsupported protocol version: {}", (Object)version);
            throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "unsupported protocol version: " + version);
        }
        if (HeaderType.codeOf((int)type) == null) {
            logger.error("unsupported header type: {}", (Object)type);
            throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "unsupported header type: " + type);
        }
        if (length <= 0) {
            logger.error("data length equals 0");
            throw new LogProxyClientException(ErrorCode.E_LEN, "data length equals 0");
        }
    }

    private void parseDataNew() {
        try {
            byte[] buff = new byte[this.dataLength];
            this.buffer.readBytes(buff, 0, this.dataLength);
            LogProxyProto.RecordData recordData = LogProxyProto.RecordData.parseFrom((byte[])buff);
            int compressType = recordData.getCompressType();
            int compressedLen = recordData.getCompressedLen();
            int rawLen = recordData.getRawLen();
            byte[] rawData = recordData.getRecords().toByteArray();
            if (compressType == CompressType.LZ4.code()) {
                byte[] bytes = new byte[compressedLen];
                int decompress = this.fastDecompressor.decompress(rawData, 0, bytes, 0, compressedLen);
                if (decompress != rawLen) {
                    throw new LogProxyClientException(ErrorCode.E_LEN, "decompressed length [" + decompress + "] is not expected [" + rawLen + "]");
                }
                this.parseRecord(bytes);
            } else {
                this.parseRecord(rawData);
            }
        }
        catch (InvalidProtocolBufferException e) {
            throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet", e);
        }
    }

    private void parseRecord(byte[] bytes) throws LogProxyClientException {
        int offset = 0;
        while (offset < bytes.length) {
            LogMessage logMessage;
            int dataLength = Conversion.byteArrayToInt((byte[])bytes, (int)(offset + 4), (int)0, (int)0, (int)4);
            try {
                logMessage = new LogMessage(false);
                byte[] data = new byte[dataLength + 8];
                System.arraycopy(bytes, offset, data, 0, data.length);
                logMessage.parse(data);
                if (ClientConf.IGNORE_UNKNOWN_RECORD_TYPE) {
                    logger.debug("Unsupported record type: {}", (Object)logMessage);
                    offset += 8 + dataLength;
                    continue;
                }
            }
            catch (Exception e) {
                throw new LogProxyClientException(ErrorCode.E_PARSE, e);
            }
            while (true) {
                try {
                    this.recordQueue.put(new StreamContext.TransferPacket(logMessage));
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
            offset += 8 + dataLength;
        }
    }

    protected final void discardSomeReadBytes() {
        if (this.buffer != null && !this.first && this.buffer.refCnt() == 1) {
            this.buffer.discardSomeReadBytes();
        }
    }

    public void channelActive(ChannelHandlerContext ctx) {
        this.poolFlag = true;
        StreamContext context = (StreamContext)ctx.channel().attr(ConnectionFactory.CONTEXT_KEY).get();
        this.stream = context.stream();
        this.params = context.getParams();
        this.recordQueue = context.recordQueue();
        logger.info("ClientId: {} connecting LogProxy: {}", (Object)this.params.info(), (Object)NetworkUtil.parseRemoteAddress((Channel)ctx.channel()));
        ctx.channel().writeAndFlush((Object)this.generateConnectRequest(this.params.getProtocolVersion()));
    }

    public ByteBuf generateConnectRequestV2() {
        LogProxyProto.ClientHandshakeRequest handShake = LogProxyProto.ClientHandshakeRequest.newBuilder().setLogType(this.params.getLogType().code()).setIp(CLIENT_IP).setId(this.params.getClientId()).setVersion("1.0.1").setEnableMonitor(this.params.isEnableMonitor()).setConfiguration(this.params.getConfigurationString()).build();
        byte[] packetBytes = handShake.toByteArray();
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length + 2 + 1 + 4 + packetBytes.length);
        byteBuf.writeBytes(MAGIC_STRING);
        byteBuf.writeShort(ProtocolVersion.V2.code());
        byteBuf.writeByte(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
        byteBuf.writeInt(packetBytes.length);
        byteBuf.writeBytes(packetBytes);
        return byteBuf;
    }

    public ByteBuf generateConnectRequest(ProtocolVersion version) {
        if (version == ProtocolVersion.V2) {
            return this.generateConnectRequestV2();
        }
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length);
        byteBuf.writeBytes(MAGIC_STRING);
        byteBuf.capacity(byteBuf.capacity() + 2 + 4 + 1);
        byteBuf.writeShort(ProtocolVersion.V0.code());
        byteBuf.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
        byteBuf.writeByte(this.params.getLogType().code());
        int length = CLIENT_IP.length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(CLIENT_IP.getBytes());
        length = this.params.getClientId().length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(this.params.getClientId().getBytes());
        length = "1.0.1".length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes("1.0.1".getBytes());
        length = this.params.getConfigurationString().length();
        byteBuf.capacity(byteBuf.capacity() + length + 4);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(this.params.getConfigurationString().getBytes());
        return byteBuf;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.poolFlag = false;
        logger.info("Connect broken of ClientId: {} with LogProxy: {}", (Object)this.params.info(), (Object)NetworkUtil.parseRemoteAddress((Channel)ctx.channel()));
        ctx.channel().disconnect();
        ctx.close();
        if (this.stream != null) {
            this.stream.triggerReconnect();
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.poolFlag = false;
        this.resetState();
        logger.error("Exception occurred ClientId: {}, with LogProxy: {}", new Object[]{this.params.info(), NetworkUtil.parseRemoteAddress((Channel)ctx.channel()), cause});
        ctx.channel().disconnect();
        ctx.close();
        if (this.stream != null) {
            if (cause instanceof LogProxyClientException) {
                if (((LogProxyClientException)cause).needStop()) {
                    this.stream.stop();
                    this.stream.triggerException((LogProxyClientException)cause);
                }
            } else {
                this.stream.triggerReconnect();
            }
        }
    }

    static enum HandshakeStateV1 {
        PB_HEAD,
        CLIENT_HANDSHAKE_RESPONSE,
        RECORD,
        ERROR_RESPONSE,
        STATUS;

    }
}

