package org.apache.dolphinscheduler.remote.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/dolphinscheduler/remote/handler/NettyServerHandler.class */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    private final NettyRemotingServer nettyRemotingServer;
    private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();

    public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
        this.nettyRemotingServer = nettyRemotingServer;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.channel().close();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        processReceived(channelHandlerContext.channel(), (Command) obj);
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor nettyRequestProcessor) {
        registerProcessor(commandType, nettyRequestProcessor, null);
    }

    public void registerProcessor(CommandType commandType, NettyRequestProcessor nettyRequestProcessor, ExecutorService executorService) {
        ExecutorService executorService2 = executorService;
        if (executorService2 == null) {
            executorService2 = this.nettyRemotingServer.getDefaultExecutor();
        }
        this.processors.putIfAbsent(commandType, new Pair<>(nettyRequestProcessor, executorService2));
    }

    private void processReceived(final Channel channel, final Command command) {
        CommandType type = command.getType();
        if (CommandType.HEART_BEAT.equals(type)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
                return;
            }
            return;
        }
        final Pair<NettyRequestProcessor, ExecutorService> pair = this.processors.get(type);
        if (pair == null) {
            this.logger.warn("commandType {} not support", type);
            return;
        }
        try {
            pair.getRight().submit(new Runnable() { // from class: org.apache.dolphinscheduler.remote.handler.NettyServerHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ((NettyRequestProcessor) pair.getLeft()).process(channel, command);
                    } catch (Throwable th) {
                        NettyServerHandler.this.logger.error("process msg {} error", command, th);
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            this.logger.warn("thread pool is full, discard msg {} from {}", command, ChannelUtils.getRemoteAddress(channel));
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.logger.error("exceptionCaught : {}", th.getMessage(), th);
        channelHandlerContext.channel().close();
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        ChannelConfig config = channel.config();
        if (channel.isWritable()) {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("{} is writable, to low water : {}", channel, Integer.valueOf(config.getWriteBufferLowWaterMark()));
            }
            config.setAutoRead(true);
        } else {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("{} is not writable, over high water level : {}", channel, Integer.valueOf(config.getWriteBufferHighWaterMark()));
            }
            config.setAutoRead(false);
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            channelHandlerContext.channel().close();
        } else {
            super.userEventTriggered(channelHandlerContext, obj);
        }
    }
}
