3 | Netty源码:什么是Reactor模型

一时失言乱红尘 2023-07-03 06:07 113阅读 0赞

今天是2020年2月2号,感觉是一个比较特殊的日子,今天就来一篇记录型的博客吧,哈哈

起初很好奇,到底什么叫Reactor模式,这个名词感觉特别高大上,然后看描述,虽然能看懂描述,但是却不是特别明白到底是什么意思。这个时候主要是没有形成一种直观的印象,直观的印象就是比如说苹果,再给你看个实物,你就能把苹果与关联起来。在学习netty的时候,也遇到了Reactor模式,于是有了机会来形成一种比较直观的印象。

什么是Reactor模式?

定义看起来很抽象,但是其实很好理解。**它是一种开发模式,模式的核心流程:注册感兴趣的事件 -> 扫描是否有感兴趣的事件发生 -> 事件发生后做出相应的处理。**仅此而已。使用BIO开发的时候,每有一个新的请求过来了,都会新开一个线程,然后在新的线程里面进行业务处理,这种处理方式就是Thread-Per-Connection;
format_png
所以对应起来,使用NIO开发的时候,也有一个模式去处理相应的请求与业务逻辑,叫做Reactor模式。至于具体怎么做,也就是前面提到的Reactor模式的核心流程。

Reactor模式的3种版本

开始这个之前我有一个疑问:Thread-Per-Connection与Reactor单线程有什么关系?

Thread-Per-Connection模式

示意图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9tyrEyYU-1580657917209)(https://gitee.com/sasurai/pics/raw/master/mac/20200201181549.png)\]

伪代码:
format_png 1

Reactor单线程模式

format_png 2

从这张图里面看不懂其执行流是什么样的。待后续理解了再补上解读。

Reactor多线程模式

format_png 3

主从Reactor多线程模式

对服务器开发来说,很重要的事情是接收连接,accept事件会被单独注册到另外一个reactor中。

format_png 4

在Netty中如何实现Reactor三种模式

format_png 5

其中单线程和非主从reactor多线程模式的差别只在于new的时候传入的线程数量,不传的话,会默认以CPU的核数为依据来确定最终的线程数。

Netty 如何支持主从 Reactor 模式

以netty项目源代码(分支4.1)中netty-example模块的EchoServer为例。

保存

它是一个主从reactor多线程模式,其中bossGroup负责accept事件,workerGroup负责逻辑处理。

format_png 6

在①中,分别将两个EventLoopGroup传入到ServerBootstrap中,并将这两个EventLoopGroup保存起来。

format_png 7

步骤②执行的保存逻辑如下:

format_png 8

步骤③即已保存完毕。保存起来之后,什么时候使用呢?

将channel注册到parentGroup

先看parentGroup的使用过程,找到使用了group这个变量的地方Ctrl + B

format_png 9

进去之后,是一个类似于普通getter方法

format_png 10

只有一个地方调用,名称也叫group(),所以还可以继续往上看调用者

format_png 11

然后使用Ctrl + Alt + H查看该group()方法的调用者:

format_png 12

initAndRegister()中可以找到将channel(即ServerSocketChannel)注册到该EventLoopGroup的代码,如下:

format_png 13

绑定完毕。

channel注册到childGroup

找到使用了childGroup这个变量的地方Ctrl + B

format_png 14

只有个地方使用到了该childGroup,并改名成了currentChildGroup

format_png 15

①改名,②将childGroup作为一个变量,传入ServerBootstrapAcceptor中。ServerBootstrapAcceptor继承自ChannelInboundHandlerAdapter,其覆盖了父类的channelRead()方法,其中将新进来的channel注册到childGroup中。

format_png 16

也就是说,新进来的连接,即SocketChannel,都会被注册到childGroup中。

新连接建立进行哪些初始化

回到上面的init()方法中提出的,它是何时被调用的这个问题中。

它是AbstractBootstrap抽象类中的一个抽象方法,有两个类继承自AbstractBootstrap,分别是BootstrapServerBootstrap。调用init()方法的地方只有一个,即initAndRegister()中。

format_png 17

其中传入init()channelServerSocketChannel,其大致过程:

当服务端即EchoServer启动的时候,会为ServerSocketChannel的pipeline添加一个ServerBootstrapAcceptor,所以每当有来自客户端的请求时,都会首先经过ServerBootstrapAcceptor,让它先处理,而它的处理内容就是将SocketChannel注册到childGroup中。

为什么说 Netty 的 main reactor 大多并不能用到一个线程组,只能线程组里面的一个?

因为服务端只会启动一次,只有在启动过程中去绑定端口号时才会将ServerSocketChannel绑定到main reactor上。所以这时候要从initAndRegister的调用者逐级往上查看,如下;

  1. public ChannelFuture bind(InetAddress inetHost, int inetPort) {
  2. return bind(new InetSocketAddress(inetHost, inetPort));
  3. }
  4. public ChannelFuture bind(SocketAddress localAddress) {
  5. validate();
  6. return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
  7. }
  8. private ChannelFuture doBind(final SocketAddress localAddress) {
  9. final ChannelFuture regFuture = initAndRegister();
  10. final Channel channel = regFuture.channel();
  11. ...
  12. }
  13. final ChannelFuture initAndRegister() {
  14. Channel channel = null;
  15. try {
  16. channel = channelFactory.newChannel();
  17. // 设置新接入连接的SocketChannel注册到sub reactor
  18. init(channel);
  19. } catch (Throwable t) {
  20. ...
  21. }
  22. // 注册ServerSocketChannel到main reactor
  23. ChannelFuture regFuture = config().group().register(channel);
  24. ...
  25. }

所以一个ServerSocketChannel只会注册到一个group中。但还是个疑问,是与EventLoopGroup相关的,留待后续再来回答。这个问题的意思是说,只能用线程组里面的一个线程,为什么?为什么不能多个线程?下面这个问题可以回答这个疑问!

Netty 给 Channel 分配 NIO event loop 的规则是什么

initAndRegister()中的config().group().register(channel)代码出发,也就是ServerSocketChannel注册到main reactor中的那段代码(参见上面)。

  1. // 从register方法进入
  2. ChannelFuture regFuture = config().group().register(channel);
  3. // 进入EventLoopGroup.java,它是一个接口。
  4. // NIO选MultithreadEventLoopGroup的实现
  5. ChannelFuture register(Channel channel);
  6. @Override
  7. public ChannelFuture register(Channel channel) {
  8. return next().register(channel);
  9. }
  10. @Override
  11. public EventLoop next() {
  12. return (EventLoop) super.next();
  13. }
  14. // 进入父类的next()实现中
  15. @Override
  16. public EventExecutor next() {
  17. return chooser.next();
  18. }
  19. // 进入chooser的next()方法,发现这个chooser是一个接口类型
  20. private final EventExecutorChooserFactory.EventExecutorChooser chooser;
  21. public interface EventExecutorChooserFactory {
  22. EventExecutorChooser newChooser(EventExecutor[] executors);
  23. @UnstableApi interface EventExecutorChooser {
  24. EventExecutor next();
  25. }
  26. }
  27. // chooser的初始化是根据传入的线程数决定的
  28. // 在MultithreadEventExecutorGroup的构造函数中
  29. children = new EventExecutor[nThreads];
  30. // 需要多少个线程,就有多少个EventExecutor,它初步与Thread等价
  31. chooser = chooserFactory.newChooser(children);

所以chooser.next()返回的是一个等价于Thread的对象,也就是说这个ServerSocketChannel只会在这个Thread中进行接收。其中的chooser就是根据线程数的个数,来选取一个线程分配给register进来的ServerSocketChannel。具体分配策略:

  1. // next()是一个抽象方法,它的具体实现有两种
  2. public EventExecutorChooser newChooser(EventExecutor[] executors) {
  3. if (isPowerOfTwo(executors.length)) {
  4. return new PowerOfTwoEventExecutorChooser(executors);
  5. } else {
  6. return new GenericEventExecutorChooser(executors);
  7. }
  8. }
  9. // 判断是否为2的幂的简便方法
  10. private static boolean isPowerOfTwo(int val) {
  11. return (val & -val) == val;
  12. }
  13. private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
  14. private final AtomicInteger idx = new AtomicInteger();
  15. private final EventExecutor[] executors;
  16. PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
  17. this.executors = executors;
  18. }
  19. @Override
  20. public EventExecutor next() {
  21. return executors[idx.getAndIncrement() & executors.length - 1];
  22. }
  23. }
  24. private static final class GenericEventExecutorChooser implements EventExecutorChooser {
  25. private final AtomicInteger idx = new AtomicInteger();
  26. private final EventExecutor[] executors;
  27. GenericEventExecutorChooser(EventExecutor[] executors) {
  28. this.executors = executors;
  29. }
  30. @Override
  31. public EventExecutor next() {
  32. return executors[Math.abs(idx.getAndIncrement() % executors.length)];
  33. }
  34. }

至此,上面的那个疑问算是有了一个答案。

发表评论

表情:
评论列表 (有 0 条评论,113人围观)

还没有评论,来说两句吧...

相关阅读