Netty Boss线程是为什么都是写1

末蓝、 2023-06-29 06:57 83阅读 0赞

文章目录

  • Netty Boss线程是为什么都是写1
    • 中间件使用
      • Dubbo中的使用方法:NettyServer#doOpen
      • RxServer 的使用方法
    • 为什么呢?
    • 总结

Netty Boss线程是为什么都是写1

先来看看常见的中间件、框架的使用方式。

中间件使用

Dubbo中的使用方法:NettyServer#doOpen

  1. bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));

RxServer 的使用方法

  1. serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerParentEventLoop(true),
  2. RxNetty.getRxEventLoopProvider().globalServerEventLoop(true));
  3. public static RxEventLoopProvider getRxEventLoopProvider() {
  4. return rxEventLoopProvider;
  5. }
  6. private static volatile RxEventLoopProvider rxEventLoopProvider =
  7. new SingleNioLoopProvider(1, Runtime.getRuntime().availableProcessors());

看一下对应的实现 SingleNioLoopProvider

  1. public SingleNioLoopProvider() {
  2. this(Runtime.getRuntime().availableProcessors());
  3. }
  4. public SingleNioLoopProvider(int threadCount) {
  5. eventLoop = new SharedNioEventLoopGroup(threadCount);
  6. parentEventLoop = eventLoop;
  7. parentEventLoopCount = childEventLoopCount = threadCount;
  8. nativeEventLoop = new AtomicReference<EpollEventLoopGroup>();
  9. nativeParentEventLoop = nativeEventLoop;
  10. }
  11. public SingleNioLoopProvider(int parentEventLoopCount, int childEventLoopCount) {
  12. this.parentEventLoopCount = parentEventLoopCount;
  13. this.childEventLoopCount = childEventLoopCount;
  14. parentEventLoop = new SharedNioEventLoopGroup(parentEventLoopCount);
  15. eventLoop = new SharedNioEventLoopGroup(childEventLoopCount);
  16. nativeParentEventLoop = new AtomicReference<EpollEventLoopGroup>();
  17. nativeEventLoop = new AtomicReference<EpollEventLoopGroup>();
  18. }

为什么呢?

可以看到这里的boss线程池的线程数都是一个,权且思考一下:

从上层看的话,每个Server启动的时候需要绑定一个端口,那么无论是NIO还是BIO,我们都需要在服务器进行接收,而一个端口对应的是一个Selector或者ServerSocket对象,那么如果是多个线程的话,会不会存在竞争关系?并且因为本来这个Boss线程池(Acceptor)也只是接收一下客户端TCP连接,并创建一个Channel对象发送给work线程池去进行真正的读写,难道是怕线程竞争?

  1. 看一下对应的代码

    1. /** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */
    2. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    3. super.group(parentGroup);
    4. if (childGroup == null) {
    5. throw new NullPointerException("childGroup");
    6. }
    7. if (this.childGroup != null) {
    8. throw new IllegalStateException("childGroup set already");
    9. }
    10. this.childGroup = childGroup;
    11. return this;
    12. }
  2. Server初始化代码&处理

    ServerBootStrap#init

    1. void init(Channel channel) throws Exception {
    2. // 省略...
    3. p.addLast(new ChannelInitializer<Channel>() {
    4. @Override
    5. public void initChannel(final Channel ch) throws Exception {
    6. final ChannelPipeline pipeline = ch.pipeline();
    7. ChannelHandler handler = config.handler();
    8. if (handler != null) {
    9. pipeline.addLast(handler);
    10. }
    11. ch.eventLoop().execute(new Runnable() {
    12. @Override
    13. public void run() {
    14. // 初始化Acceptor
    15. pipeline.addLast(new ServerBootstrapAcceptor(
    16. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    17. }
    18. });
    19. }
    20. });
    21. }

    可以看到这里ServerBootstrapAcceptor是pipeline的处理器。下面看看他的实现逻辑:

    1. 开启Selector

      1. private SelectorTuple openSelector() {
      2. final Selector unwrappedSelector;
      3. try {
      4. unwrappedSelector = provider.openSelector();
      5. } catch (IOException e) {
      6. throw new ChannelException("failed to open a new selector", e);
      7. }
    2. NioEventLoop接收到请求,派发请求到Worker处理

      NioEventLoop#processSelectedKey

      1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
      2. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
      3. // 省略
      4. // 如果当前通道可读
      5. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
      6. unsafe.read();
      7. }
      8. } catch (CancelledKeyException ignored) {
      9. unsafe.close(unsafe.voidPromise());
      10. }
      11. }
    3. 读取配置并封装成Channel

    NioMessageUnsafe#doReadMessages

    1. protected int doReadMessages(List<Object> buf) throws Exception {
    2. SocketChannel ch = SocketUtils.accept(javaChannel());
    3. try {
    4. if (ch != null) {
    5. // 将NioSocketChannel作为上游参数传到下游处理器
    6. buf.add(new NioSocketChannel(this, ch));
    7. return 1;
    8. }
    9. } catch (Throwable t) {
    10. logger.warn("Failed to create a new channel from an accepted socket.", t);
    11. try {
    12. ch.close();
    13. } catch (Throwable t2) {
    14. logger.warn("Failed to close a socket.", t2);
    15. }
    16. }
    17. return 0;
    18. }
    1. 将Channel派发的Worker线程池
      ServerBootstrapAcceptor#channelRead

      public void channelRead(ChannelHandlerContext ctx, Object msg) {

      1. // Nio处理完之后是一个channel
      2. final Channel child = (Channel) msg;
      3. // 为当前channel添加处理器
      4. child.pipeline().addLast(childHandler);
      5. setChannelOptions(child, childOptions, logger);
      6. for (Entry<AttributeKey<?>, Object> e: childAttrs) {
      7. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
      8. }
      9. try {
      10. // 注册到channe到WorkerNioEventLoop让Worker进行通道的读写。
      11. childGroup.register(child).addListener(new ChannelFutureListener() {
      12. @Override
      13. public void operationComplete(ChannelFuture future) throws Exception {
      14. if (!future.isSuccess()) {
      15. forceClose(child, future.cause());
      16. }
      17. }
      18. });
      19. } catch (Throwable t) {
      20. forceClose(child, t);
      21. }
      22. }

总结

看到了现在,仿佛1的这个魔法值还仅仅是因为是一个端口的原因。而线程池的存在是为了在启动的时候可以绑定多个端口,方便扩展。
但是根本上还是一个端口对应一个boss线程池的一条处理线程,即一条线程持有一个端口对应的selector。当然如果我们启动不仅仅是一个端口的话,仍然需要写对应的端口个数。以上是Netty实现的原理。
继续复盘我们刚才说的线程竞争会不会导致selector的变慢之类的,其实是因为selector的特性导致的,我们只能有一个线程持久selector,即便是我们在多个线程进行持有selector,如果想要获取通道上面的事件就需要调用Selector#select()方法。看一下他对应的介绍。

This method performs a blocking selection operation. It returns only after at least one channel is selected, this selector’s wakeup method is invoked, or the current thread is interrupted, whichever comes first

这个操作是阻塞的,同一时间只能有一个线程进行处理,所以天生不支持多线程

​ – 以上是Netty4的源码

发表评论

表情:
评论列表 (有 0 条评论,83人围观)

还没有评论,来说两句吧...

相关阅读

    相关 什么线

    线程(Thread)是计算机科学中的一个概念,是进程中的一个基本执行单元。在一个进程内,可以包含多个线程,每个线程都可以独立地执行指令序列。 每个线程拥有自己的程序计数器、寄

    相关 什么线

    什么是线程? > 说到线程, 离不开的概念就是进程 . 也离不开计算机或操作系统的发展过程. 简单的说一下. 进程的引入 > 在计算机高度发达的今天, 我们很难想象以

    相关 什么线

    1、线程是轻量级的进程,是程序执行的最小单元 2、相对于多进程应用,多线程在数据共享方面效率要高很多;多线程可以互不干扰的不要并发执行(实际不是并发),并共享进程的全局变量和

    相关 Netty什么

    [《跟闪电侠学Netty》开篇:Netty是什么?][Netty_Netty] [Netty笔记之三:Netty实现Socket编程][Netty_Netty_Socket]