从源码分析RocketMQ系列-Producer的invokeSync()方法

╰+哭是因爲堅強的太久メ 2023-05-28 15:52 109阅读 0赞

导语
  在之前的博客中通过对于Producer中SendResult的跟踪找到了在Client模块下的所有的封装以及消费的过程,深入到对接Remoting模块的接口中对消息的封装以及发送回收等。但是对于具体后续操作还是没有跟进,这篇博客就从this.remotingClient.invokeSync(addr, request, timeoutMillis);方法开始进入的Message的发送、消费、持久化等功能的探讨。

文章目录

    • 从invokeSync()方法开始
    • 总结

从invokeSync()方法开始

  在之前的分析中可以看到invokeSync()方法是来自于RemotingClient接口,并且对这个接口也有一定的说明,了解了这个接口只有一个实现类NettyRemotingClient,从类名上就可以看出来使用的是Netty的支持。那么对于invokeSync()方法的支持是什么样呢?方法总体如下图所示,下面就来一步步的分析该方法
在这里插入图片描述

1、创建一个Channel

  对于一个Channel的管理与创建,在之前分享RabbitMQ的时候简单的提到过,这里由于涉及到了Netty,而Netty本身就是支持了NIO的操作,既然支持了NIO的操作。这里就需要知道Channel的创建逻辑。

  1. final Channel channel = this.getAndCreateChannel(addr);

  代码本身只创建了一个Channel并且通过调用了一个this.getAndCreateChannel(addr);方法来进行获取,那么按照之前创建Channel的逻辑,首先需要建立一个Connection,然后在利用这个Connection去复用一些Channel。那么猜测this.getAndCreateChannel(addr);这个方法中一定有关于Connection的创建逻辑。进入该方法。

  1. private Channel getAndCreateChannel(final String addr) throws InterruptedException {
  2. if (null == addr) {
  3. return getAndCreateNameserverChannel();
  4. }
  5. ChannelWrapper cw = this.channelTables.get(addr);
  6. if (cw != null && cw.isOK()) {
  7. return cw.getChannel();
  8. }
  9. return this.createChannel(addr);
  10. }

  从方法体重可以看到还需要进行新的判断,并且有ChannelWrapper这样一个封装对象。这样从上面的逻辑来看真正实现第一次创建的是通过this.createChannel(addr);方法来进行操作的。果然在这个方法中找到了如下的一个操作,用过Netty的都知道这个操作是什么意思?这个操作就是为NIO操作创建一个Bootstrap。对于这块内容在后面逻辑中整合进行分析。这里首先知道这个方法是提供了故意而Channel的连接。至于其他的操作,都放到后续的对于Netty使用以及对于NIO详细说明来分析。这里主要是分析Message的流转。

  1. if (createNewConnection) {
  2. ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
  3. log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
  4. cw = new ChannelWrapper(channelFuture);
  5. this.channelTables.put(addr, cw);
  6. }

2、调用一个前置的RPCHook

  上这个分析就是创建了一个Channel的工作,这个Channel就是需要通信的信道。一般的操作都是由这个Channel来完成对于网络请求的发送和接收操作。

  1. doBeforeRpcHooks(addr, request);
  2. protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
  3. if (rpcHooks.size() > 0) {
  4. for (RPCHook rpcHook: rpcHooks) {
  5. rpcHook.doBeforeRequest(addr, request);
  6. }
  7. }
  8. }

  在很多的框架代码中有一个统一的规则就是将实际的工作交给一个do开头的方法来进行操作。这里在消息请求到Consumer之前先做了一个前置的Hook,这里主要完成的工作就是对这个Request进行一个属性的扩展操作。会看到其实在调用发送方法之前对于Request添加了很多的扩展属性。
在这里插入图片描述

3、调用发送方法
  为了实现方法的解耦操作,这里使用了下面这个方法对Request消息进行发送。这个Request就是通过Client端进行了封装的操作。这里调用的同步模式,所以先分析关于同步调用的方法

  1. RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);

  既然怒到这个方法之后最为重要的一个点就是ResponseFuture对象,在之前的博客中有一篇是关于分析Future模式的,对于Future模式,简单的说就是我们想要的结果。那么这里是对这个结果进行了封装,但是实际上通过Netty调用的Future并不是这个。关于这方面的知识可以了解关于Netty的相关知识这里不做过多的说明,有需要的话后续的分析中会提到。
在这里插入图片描述
  到这里所有的消息都通过 channel.writeAndFlush(request).addListener(new ChannelFutureListener())操作发送到了Channel中开水进入到Broker进行消息进步操作。后续的操作就是从ResponseFuture对象中,获取到我们预期的RemotingCommand Response。并且对这个Response进行了检查。

4、调用后置的RPCHook
  这个调用的深入会发现这个方法并没有被实现,是个空方法没有方法体。

  1. doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);

总结

  从invokeSync()方法开始,描述了整个的消息核心发送过程,调用很清晰,实现的功能也比较简单。理解这个里需要对Netty有关的知识和NIO有关的知识做一定的了解。后续的分享中也会涉及到有关知识的讨论。

发表评论

表情:
评论列表 (有 0 条评论,109人围观)

还没有评论,来说两句吧...

相关阅读