package org.apache.dubbo.rpc.rocketmq;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
import org.apache.dubbo.remoting.buffer.HeapChannelBuffer;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProtocolServer;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.rocketmq.codec.RocketMQCountCodec;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/* loaded from: input_file:org/apache/dubbo/rpc/rocketmq/RocketMQProtocol.class */
public class RocketMQProtocol extends AbstractProtocol {
    public static final String NAME = "rocketmq";
    public static final int DEFAULT_PORT = 20880;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/dubbo/rpc/rocketmq/RocketMQProtocol$DubboMessageListenerConcurrently.class */
    public class DubboMessageListenerConcurrently implements MessageListenerConcurrently {
        private RocketMQCountCodec rocketmqCountCodec;
        private DefaultMQProducer defaultMQProducer;
        private RocketMQProtocolServer rocketMQProtocolServer;

        private DubboMessageListenerConcurrently() {
            this.rocketmqCountCodec = new RocketMQCountCodec(FrameworkModel.defaultModel());
        }

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (final MessageExt messageExt : list) {
                this.rocketMQProtocolServer.getExecutorService().submit(new Runnable() { // from class: org.apache.dubbo.rpc.rocketmq.RocketMQProtocol.DubboMessageListenerConcurrently.1
                    @Override // java.lang.Runnable
                    public void run() {
                        DubboMessageListenerConcurrently.this.execute(messageExt);
                    }
                });
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void execute(MessageExt messageExt) {
            RpcContext.getContext().setRemoteAddress(messageExt.getUserProperty(RocketMQProtocolConstant.SEND_ADDRESS), 9876);
            String userProperty = messageExt.getUserProperty(RocketMQProtocolConstant.URL_STRING);
            URL valueOf = URL.valueOf(userProperty);
            RocketMQChannel rocketMQChannel = new RocketMQChannel();
            rocketMQChannel.setRemoteAddress(RpcContext.getContext().getRemoteAddress());
            rocketMQChannel.setUrl(valueOf);
            rocketMQChannel.setUrlString(userProperty);
            rocketMQChannel.setMessageExt(messageExt);
            rocketMQChannel.setDefaultMQProducer(this.defaultMQProducer);
            rocketMQChannel.setRocketMQCountCodec(this.rocketmqCountCodec);
            Response invoke = invoke(messageExt, rocketMQChannel, valueOf);
            if (Objects.isNull(invoke)) {
                return;
            }
            ChannelBuffer createChannelBuffer = createChannelBuffer(rocketMQChannel, invoke, valueOf);
            if (Objects.isNull(createChannelBuffer)) {
                return;
            }
            sendMessage(messageExt, createChannelBuffer, valueOf, userProperty);
        }

        private Response invoke(MessageExt messageExt, Channel channel, URL url) {
            Response response = new Response();
            try {
                Long valueOf = Long.valueOf(messageExt.getUserProperty("timeout"));
                if (RocketMQProtocol.this.logger.isDebugEnabled()) {
                    RocketMQProtocol.this.logger.debug(String.format("reply message ext is : %s", messageExt));
                }
                if (Objects.isNull(messageExt.getProperty("CLUSTER"))) {
                    MQClientException mQClientException = new MQClientException(10007, "create reply message fail, requestMessage error, property[CLUSTER] is null.");
                    response.setErrorMessage(mQClientException.getMessage());
                    response.setStatus((byte) 40);
                    RocketMQProtocol.this.logger.error(mQClientException);
                } else {
                    Object decode = this.rocketmqCountCodec.decode(channel, new HeapChannelBuffer(messageExt.getBody()));
                    String topic = messageExt.getTopic();
                    Invocation invocation = (Invocation) ((Request) decode).getData();
                    if (valueOf.longValue() < System.currentTimeMillis()) {
                        RocketMQProtocol.this.logger.warn(String.format("message timeoute time is %d invocation is %s ", valueOf, invocation));
                        return null;
                    }
                    Invoker invoker = ((Exporter) RocketMQProtocol.this.exporterMap.get(topic)).getInvoker();
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    Result invoke = invoker.invoke(invocation);
                    response.setStatus((byte) 20);
                    response.setResult(invoke);
                }
            } catch (Exception e) {
                String format = String.format("data decode or invoke fail, url is %s cause is %s", url, e.getMessage());
                response.setErrorMessage(format);
                response.setStatus((byte) 40);
                RocketMQProtocol.this.logger.error(format, e);
            }
            return response;
        }

        private ChannelBuffer createChannelBuffer(Channel channel, Response response, URL url) {
            ChannelBuffer dynamicChannelBuffer = new DynamicChannelBuffer(2048);
            try {
                this.rocketmqCountCodec.encode(channel, dynamicChannelBuffer, response);
            } catch (Exception e) {
                String format = String.format("encode fail, url is %s cause is %s", url, e.getMessage());
                response.setErrorMessage(format);
                response.setStatus((byte) 40);
                RocketMQProtocol.this.logger.error(format, e);
                try {
                    dynamicChannelBuffer = new DynamicChannelBuffer(2048);
                    this.rocketmqCountCodec.encode(channel, dynamicChannelBuffer, response);
                } catch (IOException e2) {
                    RocketMQProtocol.this.logger.error(String.format("encode exception response fail, url is %s cause is %s", url, e.getMessage()), e2);
                    dynamicChannelBuffer = null;
                }
            }
            return dynamicChannelBuffer;
        }

        private boolean sendMessage(MessageExt messageExt, ChannelBuffer channelBuffer, URL url, String str) {
            try {
                Message createReplyMessage = MessageUtil.createReplyMessage(messageExt, channelBuffer.array());
                createReplyMessage.putUserProperty(RocketMQProtocolConstant.SEND_ADDRESS, RocketMQProtocolConstant.LOCAL_ADDRESS.getHostString());
                createReplyMessage.putUserProperty(RocketMQProtocolConstant.URL_STRING, str);
                SendResult send = this.defaultMQProducer.send(createReplyMessage, 3000L);
                if (!RocketMQProtocol.this.logger.isDebugEnabled()) {
                    return true;
                }
                RocketMQProtocol.this.logger.debug(String.format("send result is : %s", send));
                return true;
            } catch (Exception e) {
                RocketMQProtocol.this.logger.error(String.format("send response fail, url is %s cause is %s", url, e.getMessage()), e);
                return false;
            }
        }
    }

    public static RocketMQProtocol getDubboProtocol(ScopeModel scopeModel) {
        return (RocketMQProtocol) scopeModel.getExtensionLoader(Protocol.class).getExtension(NAME, false);
    }

    public int getDefaultPort() {
        return 9876;
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        RocketMQExporter rocketMQExporter = new RocketMQExporter(invoker, url, this.exporterMap);
        String key = rocketMQExporter.getKey();
        try {
            RocketMQProtocolServer openServer = openServer(url, "provider");
            try {
                String parameter = url.getParameter("groupModel");
                if (Objects.nonNull(parameter) && Objects.equals(parameter, "select")) {
                    openServer.getDefaultMQPushConsumer().subscribe(key, createMessageSelector(url));
                } else {
                    openServer.getDefaultMQPushConsumer().subscribe(key, "*");
                }
                return rocketMQExporter;
            } catch (Exception e) {
                String format = String.format("topic subscirbe fail, topic is %s, cause is %s", key, e.getMessage());
                this.logger.error(format, e);
                throw new RpcException(format, e);
            }
        } catch (Exception e2) {
            String format2 = String.format("create rocketmq client fail, url is %s , topic is %s, cause is %s", url, key, e2.getMessage());
            this.logger.error(format2, e2);
            throw new RpcException(format2, e2);
        }
    }

    private MessageSelector createMessageSelector(URL url) {
        if (Objects.isNull(url.getParameter("group")) && Objects.isNull(url.getParameter("version"))) {
            throw new RuntimeException("group and version is not null");
        }
        StringBuffer stringBuffer = new StringBuffer();
        boolean z = false;
        if (Objects.nonNull(url.getParameter("group"))) {
            stringBuffer.append("group").append("=").append(url.getParameter("group"));
            z = true;
        }
        if (Objects.nonNull(url.getParameter("version"))) {
            if (z) {
                stringBuffer.append(" and ");
            }
            stringBuffer.append("version").append("=").append(url.getParameter("version"));
        }
        return MessageSelector.bySql(stringBuffer.toString());
    }

    private RocketMQProtocolServer openServer(URL url, String str) {
        RocketMQProtocolServer rocketMQProtocolServer;
        String address = url.getAddress();
        ProtocolServer protocolServer = (ProtocolServer) this.serverMap.get(address);
        if (protocolServer != null) {
            return (RocketMQProtocolServer) protocolServer;
        }
        synchronized (this) {
            if (((ProtocolServer) this.serverMap.get(address)) == null) {
                this.serverMap.put(address, createServer(url, address, str));
            }
            rocketMQProtocolServer = (RocketMQProtocolServer) ((ProtocolServer) this.serverMap.get(address));
        }
        return rocketMQProtocolServer;
    }

    private ProtocolServer createServer(URL url, String str, String str2) {
        RocketMQProtocolServer rocketMQProtocolServer = new RocketMQProtocolServer();
        rocketMQProtocolServer.setModel(str2);
        DubboMessageListenerConcurrently dubboMessageListenerConcurrently = new DubboMessageListenerConcurrently();
        dubboMessageListenerConcurrently.defaultMQProducer = rocketMQProtocolServer.getDefaultMQProducer();
        dubboMessageListenerConcurrently.rocketMQProtocolServer = rocketMQProtocolServer;
        rocketMQProtocolServer.setMessageListenerConcurrently(dubboMessageListenerConcurrently);
        rocketMQProtocolServer.reset(url);
        return rocketMQProtocolServer;
    }

    protected <T> Invoker<T> protocolBindingRefer(Class<T> cls, URL url) throws RpcException {
        try {
            return new RocketMQInvoker(cls, url, openServer(url, "consumer"));
        } catch (Exception e) {
            String format = String.format("protocol binding refer fail, url is %s , cause is %s ", url, e.getMessage());
            this.logger.error(format, e);
            throw new RpcException(format, e);
        }
    }
}
