package com.baidu.cloud.starlight.core.rpc;

import com.baidu.cloud.starlight.api.common.Constants;
import com.baidu.cloud.starlight.api.exception.CodecException;
import com.baidu.cloud.starlight.api.exception.RpcException;
import com.baidu.cloud.starlight.api.exception.StarlightRpcException;
import com.baidu.cloud.starlight.api.extension.ExtensionLoader;
import com.baidu.cloud.starlight.api.model.MsgBase;
import com.baidu.cloud.starlight.api.model.Request;
import com.baidu.cloud.starlight.api.model.Response;
import com.baidu.cloud.starlight.api.model.RpcResponse;
import com.baidu.cloud.starlight.api.protocol.Protocol;
import com.baidu.cloud.starlight.api.rpc.Processor;
import com.baidu.cloud.starlight.api.rpc.RpcService;
import com.baidu.cloud.starlight.api.rpc.ServiceInvoker;
import com.baidu.cloud.starlight.api.rpc.ServiceRegistry;
import com.baidu.cloud.starlight.api.rpc.callback.RpcCallback;
import com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory;
import com.baidu.cloud.starlight.api.serialization.serializer.Serializer;
import com.baidu.cloud.starlight.api.transport.channel.RpcChannel;
import com.baidu.cloud.starlight.api.utils.GenericUtil;
import com.baidu.cloud.starlight.api.utils.LogUtils;
import com.baidu.cloud.starlight.transport.utils.TimerHolder;
import com.baidu.cloud.thirdparty.netty.util.Timeout;
import com.baidu.cloud.thirdparty.netty.util.TimerTask;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/baidu/cloud/starlight/core/rpc/ServerProcessor.class */
public class ServerProcessor implements Processor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerProcessor.class);
    private ServiceRegistry serviceRegistry;
    private ThreadPoolFactory threadPoolFactory;

    /* loaded from: input_file:com/baidu/cloud/starlight/core/rpc/ServerProcessor$ServerProcessTask.class */
    private class ServerProcessTask implements Runnable {
        private Request request;
        private RpcCallback callback;
        private ServiceInvoker serviceInvoker;

        public ServerProcessTask(Request request, RpcCallback rpcCallback, ServiceInvoker serviceInvoker) {
            this.request = request;
            this.callback = rpcCallback;
            this.serviceInvoker = serviceInvoker;
        }

        @Override // java.lang.Runnable
        public void run() {
            Object obj = this.request.getNoneAdditionKv().get(Constants.BEFORE_THREAD_EXECUTE_TIME_KEY);
            if (obj instanceof Long) {
                LogUtils.addLogTimeAttachment(this.request, Constants.WAIT_FOR_THREAD_COST, System.currentTimeMillis() - ((Long) obj).longValue());
            }
            RpcService rpcService = this.serviceInvoker.getRpcService();
            if (GenericUtil.isGenericCall(this.request)) {
                GenericUtil.markGeneric(this.request);
            } else {
                Method method = rpcService.getMethod(this.request.getMethodName());
                if (method == null) {
                    this.callback.onError(new StarlightRpcException(StarlightRpcException.METHOD_NOT_FOUND_EXCEPTION, "The called method {" + this.request.getMethodName() + "} does not exist"));
                    return;
                }
                this.request.setParamsTypes(method.getParameterTypes());
                this.request.setGenericParamsTypes(method.getGenericParameterTypes());
                this.request.setReturnType(method.getReturnType());
            }
            try {
                Protocol protocol = (Protocol) ExtensionLoader.getInstance(Protocol.class).getExtension(this.request.getProtocolName());
                if (protocol == null) {
                    throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The request's protocol information is not found");
                }
                long currentTimeMillis = System.currentTimeMillis();
                LogUtils.addLogTimeAttachment(this.request, Constants.BEFORE_DECODE_BODY_TIME_KEY, currentTimeMillis);
                protocol.getDecoder().decodeBody(this.request);
                LogUtils.addLogTimeAttachment(this.request, Constants.DECODE_BODY_COST, System.currentTimeMillis() - currentTimeMillis);
                LogUtils.addLogTimeAttachment(this.request, Constants.BEFORE_SERVER_FILTER_EXEC_TIME_KEY, System.currentTimeMillis());
                this.serviceInvoker.invoke(this.request, this.callback);
            } catch (Exception e) {
                e = e;
                if (e instanceof CodecException) {
                    CodecException codecException = (CodecException) e;
                    e = new CodecException(codecException.getCode(), codecException.getMessage() + " " + Serializer.DESERIALIZE_ERROR_MSG);
                }
                this.callback.onError(e);
            }
        }
    }

    public ServerProcessor(ServiceRegistry serviceRegistry, ThreadPoolFactory threadPoolFactory) {
        this.serviceRegistry = serviceRegistry;
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public ServiceRegistry getRegistry() {
        return this.serviceRegistry;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public void process(MsgBase msgBase, final RpcChannel rpcChannel) {
        Integer invokeTimeoutMills;
        if (msgBase instanceof Response) {
            LOGGER.error("Received Response message in server side, but is not supported currently");
            throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "Received Response message in server side, but is not supported currently");
        }
        final Request request = (Request) msgBase;
        final RpcCallback rpcCallback = new RpcCallback() { // from class: com.baidu.cloud.starlight.core.rpc.ServerProcessor.1
            private Timeout timeout;
            private volatile boolean isExecuted = false;

            @Override // com.baidu.cloud.starlight.api.rpc.callback.Callback
            public void onResponse(Response response) {
                if (this.timeout != null && !this.timeout.isCancelled()) {
                    this.timeout.cancel();
                }
                if (this.isExecuted) {
                    return;
                }
                this.isExecuted = true;
                response.setRequest(request);
                Protocol protocol = (Protocol) ExtensionLoader.getInstance(Protocol.class).getExtension(request.getProtocolName());
                long currentTimeMillis = System.currentTimeMillis();
                LogUtils.addLogTimeAttachment(response, Constants.BEFORE_ENCODE_BODY_TIME_KEY, currentTimeMillis);
                protocol.getEncoder().encodeBody(response);
                LogUtils.addLogTimeAttachment(response, Constants.ENCODE_BODY_COST, System.currentTimeMillis() - currentTimeMillis);
                LogUtils.addLogTimeAttachment(response, Constants.BEFORE_IO_THREAD_EXECUTE_TIME_KEY, System.currentTimeMillis());
                rpcChannel.send(response);
            }

            @Override // com.baidu.cloud.starlight.api.rpc.callback.Callback
            public void onError(Throwable th) {
                if (this.timeout != null && !this.timeout.isCancelled()) {
                    this.timeout.cancel();
                }
                if (this.isExecuted) {
                    return;
                }
                this.isExecuted = true;
                RpcResponse rpcResponse = new RpcResponse(request.getId());
                rpcResponse.setProtocolName(request.getProtocolName());
                if (th instanceof RpcException) {
                    rpcResponse.setStatus(((RpcException) th).getCode().intValue());
                } else {
                    rpcResponse.setStatus(StarlightRpcException.INTERNAL_SERVER_ERROR.intValue());
                }
                rpcResponse.setErrorMsg(th.getMessage());
                rpcResponse.setRequest(request);
                Protocol protocol = (Protocol) ExtensionLoader.getInstance(Protocol.class).getExtension(request.getProtocolName());
                long currentTimeMillis = System.currentTimeMillis();
                LogUtils.addLogTimeAttachment(rpcResponse, Constants.BEFORE_ENCODE_BODY_TIME_KEY, currentTimeMillis);
                protocol.getEncoder().encodeBody(rpcResponse);
                LogUtils.addLogTimeAttachment(rpcResponse, Constants.ENCODE_BODY_COST, System.currentTimeMillis() - currentTimeMillis);
                LogUtils.addLogTimeAttachment(rpcResponse, Constants.BEFORE_IO_THREAD_EXECUTE_TIME_KEY, System.currentTimeMillis());
                rpcChannel.send(rpcResponse);
            }

            @Override // com.baidu.cloud.starlight.api.rpc.callback.RpcCallback
            public void addTimeout(Timeout timeout) {
                this.timeout = timeout;
            }

            @Override // com.baidu.cloud.starlight.api.rpc.callback.RpcCallback
            public Request getRequest() {
                return request;
            }
        };
        ServiceInvoker discover = this.serviceRegistry.discover(request.getServiceName());
        if (discover == null) {
            rpcCallback.onError(new StarlightRpcException(StarlightRpcException.SERVICE_NOT_FOUND_EXCEPTION, "Service {" + request.getServiceName() + "} not found in provider"));
            return;
        }
        RpcService rpcService = discover.getRpcService();
        if (rpcService.getServiceConfig() != null && rpcService.getServiceConfig().getInvokeTimeoutMills() != null && (invokeTimeoutMills = rpcService.getServiceConfig().getInvokeTimeoutMills()) != null && invokeTimeoutMills.intValue() > 0) {
            rpcCallback.addTimeout(TimerHolder.getTimer().newTimeout(new TimerTask() { // from class: com.baidu.cloud.starlight.core.rpc.ServerProcessor.2
                public void run(Timeout timeout) throws Exception {
                    rpcCallback.onError(new StarlightRpcException(StarlightRpcException.TIME_OUT_EXCEPTION, "Call service {" + request.getServiceName() + "} method {" + request.getMethodName() + "} time out"));
                }
            }, invokeTimeoutMills.intValue(), TimeUnit.MILLISECONDS));
        }
        ServerProcessTask serverProcessTask = new ServerProcessTask(request, rpcCallback, discover);
        LogUtils.addLogTimeAttachment(msgBase, Constants.BEFORE_THREAD_EXECUTE_TIME_KEY, System.currentTimeMillis());
        this.threadPoolFactory.getThreadPool(rpcService).execute(serverProcessTask);
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public void close() {
        if (this.threadPoolFactory != null) {
            this.threadPoolFactory.close();
        }
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Integer waitTaskCount(String str) {
        RpcService rpcService = this.serviceRegistry.discover(str).getRpcService();
        if (rpcService != null) {
            return Integer.valueOf(this.threadPoolFactory.getThreadPool(rpcService).getQueue().size());
        }
        return null;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Integer processingCount(String str) {
        RpcService rpcService = this.serviceRegistry.discover(str).getRpcService();
        if (rpcService != null) {
            return Integer.valueOf(this.threadPoolFactory.getThreadPool(rpcService).getActiveCount());
        }
        return null;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Long completeCount(String str) {
        RpcService rpcService = this.serviceRegistry.discover(str).getRpcService();
        if (rpcService != null) {
            return Long.valueOf(this.threadPoolFactory.getThreadPool(rpcService).getCompletedTaskCount());
        }
        return null;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Integer allWaitTaskCount() {
        ThreadPoolExecutor defaultThreadPool = this.threadPoolFactory.defaultThreadPool();
        Integer valueOf = Integer.valueOf(defaultThreadPool.getQueue().size() + defaultThreadPool.getActiveCount());
        Iterator<ServiceInvoker> it = RpcServiceRegistry.getInstance().rpcServices().iterator();
        while (it.hasNext()) {
            ThreadPoolExecutor threadPool = this.threadPoolFactory.getThreadPool(it.next().getRpcService());
            if (threadPool != defaultThreadPool) {
                valueOf = Integer.valueOf(valueOf.intValue() + threadPool.getQueue().size() + threadPool.getActiveCount());
            }
        }
        return valueOf;
    }
}
