Netty源码阅读之NioEventLoop简析

梦里梦外; 2023-02-20 08:19 141阅读 0赞
  1. NettyNioEventLoop以及NioEventLoopGroup是很重要的两个类,而NioEventLoopGroup主要是对NioEventLoop进行管理;首先来看一下这两个类的关系图(错综复杂):

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70

  1. 1

1. NioEventLoopGroup**初始化流程**

通过分析NioEventLoopGroup的构造方法的调用栈我们能够看到在io.netty.channel.MultithreadEventLoopGroup的构造方法中进行了创建:

当未指定具体的线程数目的时候,Netty会提出一个默认的线程数:DEFAULT_EVENT_LOOP_THREADS

  1. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  2. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  3. }

而该数值在同一类下的静态代码块中进行了设置:

  1. static {
  2. DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
  3. "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
  4. if (logger.isDebugEnabled()) {
  5. logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
  6. }
  7. }

显而易见,默认的线程数量为2*cpu数目。

继续深入,打开io.netty.util.concurrent.MultithreadEventExecutorGroup这个类,查看其构造方法:

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  2. EventExecutorChooserFactory chooserFactory, Object... args) {
  3. if (nThreads <= 0) {
  4. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  5. }
  6. if (executor == null) {
  7. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//创建线程执行器
  8. }
  9. //构造NioEventLoop的过程
  10. children = new EventExecutor[nThreads];
  11. for (int i = 0; i < nThreads; i ++) {
  12. boolean success = false;
  13. try {
  14. children[i] = newChild(executor, args);
  15. success = true;
  16. } catch (Exception e) {
  17. // TODO: Think about if this is a good exception type
  18. throw new IllegalStateException("failed to create a child event loop", e);
  19. } finally {
  20. if (!success) {
  21. for (int j = 0; j < i; j ++) {
  22. children[j].shutdownGracefully();
  23. }
  24. for (int j = 0; j < i; j ++) {
  25. EventExecutor e = children[j];
  26. try {
  27. while (!e.isTerminated()) {
  28. e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  29. }
  30. } catch (InterruptedException interrupted) {
  31. // Let the caller handle the interruption.
  32. Thread.currentThread().interrupt();
  33. break;
  34. }
  35. }
  36. }
  37. }
  38. }
  39. chooser = chooserFactory.newChooser(children);//生成线程选择器
  40. final FutureListener<Object> terminationListener = new FutureListener<Object>() {
  41. @Override
  42. public void operationComplete(Future<Object> future) throws Exception {
  43. if (terminatedChildren.incrementAndGet() == children.length) {
  44. terminationFuture.setSuccess(null);
  45. }
  46. }
  47. };
  48. for (EventExecutor e: children) {
  49. e.terminationFuture().addListener(terminationListener);
  50. }
  51. Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
  52. Collections.addAll(childrenSet, children);
  53. readonlyChildren = Collections.unmodifiableSet(childrenSet);
  54. }

通过上面的代码流程可知,首先是创建线程执行器,

线程执行器中传入一个默认的线程工厂:newDefaultThreadFactory,在线程工厂中进行nio线程的创建并进行线程的命名:

  1. public static String toPoolName(Class<?> poolType) {
  2. if (poolType == null) {
  3. throw new NullPointerException("poolType");
  4. }
  5. String poolName = StringUtil.simpleClassName(poolType);
  6. switch (poolName.length()) {
  7. case 0:
  8. return "unknown";
  9. case 1:
  10. return poolName.toLowerCase(Locale.US);
  11. default:
  12. if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
  13. return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
  14. } else {
  15. return poolName;
  16. }
  17. }
  18. }

线程名称类似于:nioEventLoop-x-x这种形式;同时,将根据线程数目创建一个同等容量的EventExecutor数组,数组中通过newChild()方法塞入一个EventExecutor对象,当然这只是一个抽象方法,具体的实现根据不同的类来决定,若在这个过程中有一个线程发生了异常,则会从当前的这个线程开始,将前面从第一个线程开始,关闭对应的线程执行器;之后再初始化线程选择器工厂,并通过轮询算法来处理本次的所有EventLoop事件,加入线程工厂的时候,采用了策略模式,会有一个2次幂的判断,如果上述的数组长度为2的幂次方,那么选用PowerOfTowEventExecutorChooser(executors)

,否则将选用GenericEventExecutorChooser(executors)

  1. public EventExecutorChooser newChooser(EventExecutor[] executors) {
  2. if (isPowerOfTwo(executors.length)) {
  3. return new PowerOfTowEventExecutorChooser(executors);
  4. } else {
  5. return new GenericEventExecutorChooser(executors);
  6. }
  7. }

而这两种方式主要在遍历数组的时候存在区别,当为2的次幂的时候,采用如下方式进行遍历:

  1. public EventExecutor next() {
  2. return executors[idx.getAndIncrement() & executors.length - 1];
  3. }

反之则采用如下的方式进行数组的遍历:

  1. public EventExecutor next() {
  2. return executors[Math.abs(idx.getAndIncrement() % executors.length)];
  3. }

由上面的分析,我们可以得出Netty中的EventLoop处理关系图:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70 1

  1. 2

2. NioEventLoop启动逻辑

启动的入口为:io.netty.bootstrap.AbstractBootstrap#doBind0()方法:

  1. private static void doBind0(
  2. final ChannelFuture regFuture, final Channel channel,
  3. final SocketAddress localAddress, final ChannelPromise promise) {
  4. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
  5. // the pipeline in its channelRegistered() implementation.
  6. channel.eventLoop().execute(new Runnable() {
  7. @Override
  8. public void run() {
  9. if (regFuture.isSuccess()) {
  10. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  11. } else {
  12. promise.setFailure(regFuture.cause());
  13. }
  14. }
  15. });
  16. }

主要是进行端口的绑定。

接着往下查看execute()方法:

  1. public void execute(Runnable task) {
  2. if (task == null) {
  3. throw new NullPointerException("task");
  4. }
  5. boolean inEventLoop = inEventLoop();
  6. if (inEventLoop) {
  7. addTask(task);
  8. } else {
  9. startThread();
  10. addTask(task);
  11. if (isShutdown() && removeTask(task)) {
  12. reject();
  13. }
  14. }
  15. if (!addTaskWakesUp && wakesUpForTask(task)) {
  16. wakeup(inEventLoop);
  17. }
  18. }

在execute()方法中,首先通过inEventLoop()方法判断当前的线程是否是在eventLoop中,值得注意的是,每一个NioEventLoop都维护着一个taskQueue,读写任务都将被丢进这个队列中进行维护:

  1. @Override
  2. protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
  3. // This event loop never calls takeTask()
  4. return PlatformDependent.newMpscQueue(maxPendingTasks);
  5. }

这是Netty实现异步串行无锁化的关键;回归正题,如果已经在evetLoop中了,那么直接将当前的任务添加到任务队列中,否则将执行doStartThread()方法:

  1. private void doStartThread() {
  2. assert thread == null;
  3. executor.execute(new Runnable() {
  4. @Override
  5. public void run() {
  6. thread = Thread.currentThread();
  7. if (interrupted) {
  8. thread.interrupt();
  9. }
  10. boolean success = false;
  11. updateLastExecutionTime();
  12. try {
  13. SingleThreadEventExecutor.this.run();
  14. success = true;
  15. } catch (Throwable t) {
  16. logger.warn("Unexpected exception from an event executor: ", t);
  17. } finally {
  18. for (;;) {
  19. int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
  20. if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
  21. SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
  22. break;
  23. }
  24. }
  25. // Check if confirmShutdown() was called at the end of the loop.
  26. if (success && gracefulShutdownStartTime == 0) {
  27. logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
  28. SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
  29. "before run() implementation terminates.");
  30. }
  31. try {
  32. // Run all remaining tasks and shutdown hooks.
  33. for (;;) {
  34. if (confirmShutdown()) {
  35. break;
  36. }
  37. }
  38. } finally {
  39. try {
  40. cleanup();
  41. } finally {
  42. STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
  43. threadLock.release();
  44. if (!taskQueue.isEmpty()) {
  45. logger.warn(
  46. "An event executor terminated with " +
  47. "non-empty task queue (" + taskQueue.size() + ')');
  48. }
  49. terminationFuture.setSuccess(null);
  50. }
  51. }
  52. }
  53. }
  54. });
  55. }

通过SingleThreadEventExecutor.this.run()方法,Netty中的channel将不断轮询处理channel事件:

  1. protected void run() {
  2. for (;;) {
  3. try {
  4. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  5. case SelectStrategy.CONTINUE:
  6. continue;
  7. case SelectStrategy.SELECT:
  8. select(wakenUp.getAndSet(false));
  9. if (wakenUp.get()) {
  10. selector.wakeup();
  11. }
  12. default:
  13. // fallthrough
  14. }
  15. cancelledKeys = 0;
  16. needsToSelectAgain = false;
  17. final int ioRatio = this.ioRatio;
  18. if (ioRatio == 100) {
  19. try {
  20. processSelectedKeys();
  21. } finally {
  22. // Ensure we always run tasks.
  23. runAllTasks();
  24. }
  25. } else {
  26. final long ioStartTime = System.nanoTime();
  27. try {
  28. processSelectedKeys();
  29. } finally {
  30. // Ensure we always run tasks.
  31. final long ioTime = System.nanoTime() - ioStartTime;
  32. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  33. }
  34. }
  35. } catch (Throwable t) {
  36. handleLoopException(t);
  37. }
  38. // Always handle shutdown even if the loop processing threw an exception.
  39. try {
  40. if (isShuttingDown()) {
  41. closeAll();
  42. if (confirmShutdown()) {
  43. return;
  44. }
  45. }
  46. } catch (Throwable t) {
  47. handleLoopException(t);
  48. }
  49. }
  50. }

在事件循环中不仅需要处理IO事件也需要处理非IO事件,IO事件处理通过processSelectedKeys方法来进行,而非IO事件通过runAllTasks()方法进行处理,IO事件以及非IO事件的默认占比各为50%,值得注意的是:SelectStrategy.SELECT这种情况:

  1. private void select(boolean oldWakenUp) throws IOException {
  2. Selector selector = this.selector;
  3. try {
  4. int selectCnt = 0;
  5. long currentTimeNanos = System.nanoTime();
  6. long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
  7. for (;;) {
  8. long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
  9. if (timeoutMillis <= 0) {
  10. if (selectCnt == 0) {
  11. selector.selectNow();
  12. selectCnt = 1;
  13. }
  14. break;
  15. }
  16. // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
  17. // Selector#wakeup. So we need to check task queue again before executing select operation.
  18. // If we don't, the task might be pended until select operation was timed out.
  19. // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
  20. if (hasTasks() && wakenUp.compareAndSet(false, true)) {
  21. selector.selectNow();
  22. selectCnt = 1;
  23. break;
  24. }
  25. int selectedKeys = selector.select(timeoutMillis);
  26. selectCnt ++;
  27. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
  28. // - Selected something,
  29. // - waken up by user, or
  30. // - the task queue has a pending task.
  31. // - a scheduled task is ready for processing
  32. break;
  33. }
  34. if (Thread.interrupted()) {
  35. // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
  36. // As this is most likely a bug in the handler of the user or it's client library we will
  37. // also log it.
  38. //
  39. // See https://github.com/netty/netty/issues/2426
  40. if (logger.isDebugEnabled()) {
  41. logger.debug("Selector.select() returned prematurely because " +
  42. "Thread.currentThread().interrupt() was called. Use " +
  43. "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
  44. }
  45. selectCnt = 1;
  46. break;
  47. }
  48. long time = System.nanoTime();
  49. if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
  50. // timeoutMillis elapsed without anything selected.
  51. selectCnt = 1;
  52. } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
  53. selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
  54. // The selector returned prematurely many times in a row.
  55. // Rebuild the selector to work around the problem.
  56. logger.warn(
  57. "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
  58. selectCnt, selector);
  59. rebuildSelector();
  60. selector = this.selector;
  61. // Select again to populate selectedKeys.
  62. selector.selectNow();
  63. selectCnt = 1;
  64. break;
  65. }
  66. currentTimeNanos = time;
  67. }
  68. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
  69. if (logger.isDebugEnabled()) {
  70. logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
  71. selectCnt - 1, selector);
  72. }
  73. }
  74. } catch (CancelledKeyException e) {
  75. if (logger.isDebugEnabled()) {
  76. logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
  77. selector, e);
  78. }
  79. // Harmless exception - log anyway
  80. }
  81. }

在这种情况下,每次对selectCnt这个标志位进行自增的操作,后续通过计算:

  1. time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos

若满足,则seletCnt重新置为1,最后若一旦超过SELECTOR_AUTO_REBUILD_THRESHOLD(512),那么需要重建selector,Netty正是通过这种方式规避了空轮询的bug。

发表评论

表情:
评论列表 (有 0 条评论,141人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty阅读编码器

        上回主要聊了一下Netty中的解码器,那么既然有解码,也必须得聊下编码过程了,下面将对Netty中的编码器作一下总结:   1.编码器简介 作为解码的逆过程,编码

    相关 netty阅读NioEventLoop

    初始阅读源码的时候,晦涩难懂,枯燥无味,一段时间之后就会觉得豁然开朗,被源码的魅力深深折服。 接下去要阅读的是netty的一个重要组件,NioEventLoop。 将会分为