从源码分析RocketMQ系列-Producer的SendResult的封装
导语
通过之前博客的Producer的SendResult来自哪里分析到发送的核心机制,了解了在发送之前被使用的几个Hook,以及发送消息的RequestHeader的封装,但是这些封装都被一个this.mQClientFactory.getMQClientAPIImpl().sendMessage()方法所吸收了。这个方法的内部我们还没有看到,下面就来继续探秘这块内容。
文章目录
- sendMessage()
- RemotingCommand 对象
- 开始发送消息
- 进入模式选择
- this.processSendResponse(brokerName, msg, response);
- 总结
sendMessage()
从之前的分析中可以知道,这个方法其实采用的是一个链式调用,一步一步分析会得到如下的一个调用链
this.mQClientFactory.getMQClientAPIImpl().sendMessage()
private MQClientInstance mQClientFactory;
public MQClientAPIImpl getMQClientAPIImpl() {
return mQClientAPIImpl;
}
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException
通过对上面的内容的解析,发现最终调用了 org.apache.rocketmq.client.impl.MQClientAPIImpl 类中的 sendMessage()方法,并且方法提供了很多的参数,到这里就是真正看到核心了么?先来打个问号。分析一下模块,从第一篇分析,可以看到到这里我们调用都还能在Client模块追踪到,但是当进入到这个方法之后就开始了新的旅程,这里来看看进入新的是从什么地方开始的。
从图中的方法可以看到进入方法后调用了一个RemotingCommand 的操作,从这里开始就进入到了正式的流转中。其实会发现这个地方调用的是Remoting模块的东西,也就是说调用的是与路由相关的东西。下面就来从最简单的开始进行分析
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
会看到传入的参数是之前封装好的请求头和请求标识,这个请求标识的值是10。而请求头是从上一篇博客分析的请求头分装而来。那么这createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader) 方法到底实现的内容是什么呢?进入方法之后发现什么有啥特殊的地方。就是设置了一些请求的参数。
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
RemotingCommand 对象
在进入关键代码之前,遇到了RemotingCommand这个对象,这里就需要来深入的先聊聊这个对象了,首先这个对象的位置
整个对象是被放到了Remoting模块的Protocol包下,也就是说作为远程调用协议的内容存在,那么在一般的协议调用过程中所支持的操作在Remoting模块中都是有对应的支持关系这里先不做深入的研究。先来说说关于RemotingCommand的内容,从字面的意思就是封装了一个远程调用的命令集合,该类也没有继承或者被继承。既然没有被继承,也没有继承别人,就说明这个类是独立完成了一些功能,并且这类还可以提供外部的调用功能createRequestCommand()。
开始发送消息
上面的方法对于request创建之后,就进行了下面这个方法的调用,经过这个方法的调用之后,通过我们的理解会发现。到这里才是一个真正的请求所需要的所有的数据才被创建完成。一个请求头以及传递消息的请求体。
request.setBody(msg.getBody());
进入模式选择
通过上面这个调用,还是对模式的选择,这里还是以同步模式为例进入到同步模式进行查看。
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
还是通过一个时间的控制来保证数据高效的传递,这个该概念在之前的代码分析中有提到过,通过这种方式可以进行一些优势的操作。这些优势的操作在后续深入的分享中会提到。但是从这里的异常信息可以看出,抛出的是一个超时的异常。可以思考一下到底是为什么?
关键方法就是下面这个this.sendMessageSync()方法。也许有人会问,为啥这个地方要进行这么多的方法的封装。慢慢就会了解这样编码的好处,继续往下看,会发现这个方法的你不又调用了一个方法,之前提到了一个概念,就是关于RemotingCommand这个对象,这个对象没有被继承,也没有继承别人,那么它所独立完成的功能是什么呢?这里就可以看到,其实这个对象是作为整个的请求过程中的整体对象存在。意思就是说请求和响应是一样的对象。那么下面这个方法中有两个方法哪个方法才是我们需要找到的发送消息的方法呢?
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
从上面的代码中可以看到有这样一个方法this.remotingClient.invokeSync(addr, request, timeoutMillis);,并且有this.remotingClient 这个对象,private final RemotingClient remotingClient;会看到这里才是问题的关键。它是一个接口,既然是接口的话就会定义一些实现规则,那么这里定义的规则肯定就是与消息传递有关的规则,那么就来看看这个接口定义的规则有那些?
public interface RemotingClient extends RemotingService {
//根性Namespace服务地址列表
void updateNameServerAddressList(final List<String> addrs);
//获取服务地址列表
List<String> getNameServerAddressList();
//关键方法,在上面的代码中有的是用来返回一个Response对象用的
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
//异步调用方法
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
//单向调用模式
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
//注册处理器,注意器参数
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
//设置回调服务
void setCallbackExecutor(final ExecutorService callbackExecutor);
//获取执行的服务
ExecutorService getCallbackExecutor();
//Channel的可写性
boolean isChannelWritable(final String addr);
}
通过上面的代码可以看到,这个接口不但自己定义了一些规则,还从其他地方继承了一部分功能?那么继承过来的功能有那些呢?其实很简单,就是一个服务的启停和Hook的注入。
public interface RemotingService {
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
}
到这里这个接口所提供的所有的规则都已经展示到了,但是既然有这些规则就需要这些规则进行实现,从类的继承关系上来看。这类只有一个实现类。而且这个实现类是被Netty支持的,也就是说RocketMQ底层的协议实现是通过Netty来实现的。到这里至少可以确认一点,就是我们的RocketMQ的消息发送操作其实是由this.remotingClient.invokeSync()方法来实现的。
确定完消息发送的实现方法之后,之前提到的,其中还有一个方法this.processSendResponse(brokerName, msg, response);这个方法是用来返回SendResult的,而这里我们也是为了找到这个对象是如何返回的。对于消息调用之后进入的Netty在后续的分析中在继续跟进。
this.processSendResponse(brokerName, msg, response);
private SendResult processSendResponse(
final String brokerName,
final Message msg,
final RemotingCommand response
) throws MQBrokerException, RemotingCommandException
从上面代码可以看到,这方法是被private修饰的,也就是说这个地方实现的内容并不是是对所有类可见的,并且传入的参数分别是brokerName,Message和response。由于这里响应消息和请求消息是同样的数据结构,所以用同一个类RemotingCommand进行标识。
进入方法之后首先进行的是对消息码的一个确认,分别是刷盘超时、保存超时、保存不可用。这三种状态是怎么有的,或者说有了这三种状态之后会进行怎么样的操作这里都没有具体的实现以及操作。但是在如果返回成功之后,有一个发送状态的检查这里分别对发送状态以及响应状态进行了匹配。那么也就是说,出现了上面提到的三种状态之后,会继续向下检查,并且在这个方法中对于这些状态进行了封装,并没有把最原始的状态给返回了。更进一步的说就是无论如何在做这个操作之前,消息一定是响应了一个成功状态,如果没有这个成功状态就会进入到后续的操作中。也就是最后执行break语句。
在没到这一步之前,很多人可能被我的一个概念所误导了就是我们看到的Response和之前通过sendMessage()方法传入的Request都是RemotingCommand的对象,以为这里返回的ResponseHeader与RequestHeader 是同一个东西。其实从上面的代码中其实可以看到,这两个内容不是同一个内容。从消息返回成功开始,就对收到的消息进行了细化,到这里已经是最精细的消息了。就需要继续进行拆分,将其中的内容转换为人可以看懂的SendResult。那么就来看看都进行了那些操作。
1、响应头拆分
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
从响应回来的消息体中将响应头先进行了分离,响应头信息和请求头信息一样,都是为了标识一些公共的信息而存在。
2、判断Namespace和Topic的存在关系
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
}
判断Topic在Namespace的存在关系,了解过RocketMQ架构的人都知道,所有的这些队列对应关系都被方法Namespace 上进行管理。所以得到消息之后首先要做的事情就是去验证合理性。
3、获取消息队列
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
通过获取到的topic,brokerName,以及QueueID创建了一个新的消息队列。这个消息队列的左右在后面就会看到
4、Uniq添加
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
StringBuilder sb = new StringBuilder();
for (Message message : (MessageBatch) msg) {
sb.append(sb.length() == 0 ? "" :",").append(MessageClientIDSetter.getUniqID(message));
}
uniqMsgId = sb.toString();
}
这里有个关键的点就是MessageClientIDSetter.getUniqID(msg)方法获取到了一个Uniq,这个是什么意思呢?进入方法体内部会看到其实它拿到的是Message中的一个UNIQ_KEY配置信息的Key值,为什么要获取这个,这个值在什么地方被传入的呢?这里先不做探究,后续会有跟进内容。看到如果是批量的消息话会对这个地方记性一个拼接。
public static String getUniqID(final Message msg) {
return msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
}
这里先大概的给大家一个概念,拿着这个配置去全局搜了一下,至少是在这些代码中没有看到关于对这个值的Set操作。
最后从设计文档中找了这样一段话,并且配上了这个图,会看的更加清楚。
IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是:$HOME\store\index${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W*4+2000W*20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。
其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4*500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20*2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。
“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。
5、结果封装
上面就是真正找到的SendResult 相关的封装,最后就是在Console上显示内容一样。然后一层层的进行返回。
总结
通过两次的分享,了解了RocketMQClient的消息被送入到Remoting之前都做了哪些操作,并且从这些操作中了解了关于整个的消息消费和生产的整个的关系模型。都是最为通俗易懂,用最简单的话描述整个的代码流程,当然在其中也留下了很多的疑问。对于这些疑问在后续的分享中会一一给大家填补上。对于这些问题呢?博主也是给了一定的思考方向的,有兴趣去读者可以按照博主的方向去思考一下,收获会很大!
还没有评论,来说两句吧...