package com.baidu.cloud.starlight.core.rpc;

import com.baidu.cloud.starlight.api.common.Constants;
import com.baidu.cloud.starlight.api.exception.CodecException;
import com.baidu.cloud.starlight.api.exception.StarlightRpcException;
import com.baidu.cloud.starlight.api.extension.ExtensionLoader;
import com.baidu.cloud.starlight.api.model.MsgBase;
import com.baidu.cloud.starlight.api.model.Request;
import com.baidu.cloud.starlight.api.model.Response;
import com.baidu.cloud.starlight.api.protocol.Protocol;
import com.baidu.cloud.starlight.api.rpc.LocalContext;
import com.baidu.cloud.starlight.api.rpc.Processor;
import com.baidu.cloud.starlight.api.rpc.ServiceRegistry;
import com.baidu.cloud.starlight.api.rpc.callback.RpcCallback;
import com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory;
import com.baidu.cloud.starlight.api.serialization.serializer.Serializer;
import com.baidu.cloud.starlight.api.transport.channel.RpcChannel;
import com.baidu.cloud.starlight.api.utils.GenericUtil;
import com.baidu.cloud.starlight.api.utils.LogUtils;
import java.lang.reflect.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/baidu/cloud/starlight/core/rpc/ClientProcessor.class */
public class ClientProcessor implements Processor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientProcessor.class);
    private ThreadPoolFactory threadPoolFactory;

    /* loaded from: input_file:com/baidu/cloud/starlight/core/rpc/ClientProcessor$ClientProcessTask.class */
    private class ClientProcessTask implements Runnable {
        private Response response;
        private RpcChannel context;

        public ClientProcessTask(Response response, RpcChannel rpcChannel) {
            this.response = response;
            this.context = rpcChannel;
        }

        @Override // java.lang.Runnable
        public void run() {
            ClassLoader classLoader = (ClassLoader) LocalContext.getContext(Constants.LOCAL_CONTEXT_THREAD_CLASSLOADER_KEY).get(this.context.channel().id().asLongText());
            if (classLoader != null) {
                Thread.currentThread().setContextClassLoader(classLoader);
            } else {
                ClientProcessor.LOGGER.error("Class Loader related to channel {} is null, plz check", this.context.channel().id().asLongText());
            }
            Object obj = this.response.getNoneAdditionKv().get(Constants.BEFORE_THREAD_EXECUTE_TIME_KEY);
            if (obj instanceof Long) {
                LogUtils.addLogTimeAttachment(this.response, Constants.WAIT_FOR_THREAD_COST, System.currentTimeMillis() - ((Long) obj).longValue());
            }
            RpcCallback removeCallback = this.context.removeCallback(this.response.getId());
            if (removeCallback == null) {
                LogUtils.timeoutReqAdditionalLog(this.response);
                return;
            }
            this.response.setRequest(removeCallback.getRequest());
            Class<?> returnType = removeCallback.getRequest().getReturnType();
            if (returnType == null) {
                removeCallback.onError(new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The returnType in the request message is empty. Cannot deserialize the response data!"));
                return;
            }
            this.response.setReturnType(returnType);
            Type genericReturnType = removeCallback.getRequest().getGenericReturnType();
            if (genericReturnType == null) {
                removeCallback.onError(new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The genericReturnType in the request message is empty. Cannot deserialize the response data!"));
                return;
            }
            this.response.setGenericReturnType(genericReturnType);
            if (GenericUtil.isGenericCall(removeCallback.getRequest())) {
                GenericUtil.markGeneric(this.response);
            }
            try {
                if (this.response.getStatus() == Constants.SUCCESS_CODE.intValue() && this.response.getBodyBytes() != null && this.response.getBodyBytes().length > 0) {
                    Protocol protocol = (Protocol) ExtensionLoader.getInstance(Protocol.class).getExtension(this.response.getProtocolName());
                    if (protocol == null) {
                        throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The response's protocol information is not found, protocol {" + this.response.getProtocolName() + "}");
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    LogUtils.addLogTimeAttachment(this.response, Constants.BEFORE_DECODE_BODY_TIME_KEY, currentTimeMillis);
                    protocol.getDecoder().decodeBody(this.response);
                    LogUtils.addLogTimeAttachment(this.response, Constants.DECODE_BODY_COST, System.currentTimeMillis() - currentTimeMillis);
                }
                removeCallback.onResponse(this.response);
            } catch (Exception e) {
                e = e;
                if (e instanceof CodecException) {
                    CodecException codecException = (CodecException) e;
                    e = new CodecException(codecException.getCode(), codecException.getMessage() + " " + Serializer.DESERIALIZE_ERROR_MSG);
                }
                removeCallback.onError(e);
            }
        }
    }

    public ClientProcessor(ThreadPoolFactory threadPoolFactory) {
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public ServiceRegistry getRegistry() {
        throw new StarlightRpcException("Client side is not supported ServiceRegistry currently");
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public void process(MsgBase msgBase, RpcChannel rpcChannel) {
        if (msgBase instanceof Request) {
            LOGGER.error("Received Request message in server side, but is not supported currently");
            throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "Received Request message in server side, but is not supported currently");
        }
        ClientProcessTask clientProcessTask = new ClientProcessTask((Response) msgBase, rpcChannel);
        LogUtils.addLogTimeAttachment(msgBase, Constants.BEFORE_THREAD_EXECUTE_TIME_KEY, System.currentTimeMillis());
        this.threadPoolFactory.defaultThreadPool().execute(clientProcessTask);
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public void close() {
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Integer waitTaskCount(String str) {
        return Integer.valueOf(this.threadPoolFactory.defaultThreadPool().getQueue().size());
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Integer processingCount(String str) {
        return Integer.valueOf(this.threadPoolFactory.defaultThreadPool().getActiveCount());
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Long completeCount(String str) {
        return Long.valueOf(this.threadPoolFactory.defaultThreadPool().getCompletedTaskCount());
    }

    @Override // com.baidu.cloud.starlight.api.rpc.Processor
    public Integer allWaitTaskCount() {
        return Integer.valueOf(waitTaskCount(null).intValue() + this.threadPoolFactory.defaultThreadPool().getActiveCount());
    }
}
