Channel、EventLoop、ChannelFuture 阳光穿透心脏的1/2处 2022-04-25 00:26 247阅读 0赞 # **Channel、EventLoop(Group) 和ChannelFuture** # 1. Channel—Socket; 2. ChannelFuture—异步通知。 3. EventLoop—控制、多线程处理、并发; **关系图:** ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70][] **关系说明** > * 一个EventLoopGroup 包含一个或者多个EventLoop; > * 一个EventLoop 在它的生命周期内只和一个Thread 绑定; > * 所有由EventLoop 处理的I/O 事件都将在它专有的Thread 上被处理; > * 一个Channel 在它的生命周期内只注册于一个EventLoop; > * 一个EventLoop 可能会被分配给一个或多个Channel。 ### **1、Channel 接口** ### 基本的I/O 操作(bind()、connect()、read()和write())依赖于底层网络传输所提供的原语。在基于Java 的网络编程中,其基本的构造是class Socket。Netty 的Channel 接口所提供的API,被用于所有的I/O 操作。大大地降低了直接使用Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。 每个Channel 都将会被分配一个ChannelPipeline 和ChannelConfig。ChannelConfig 包含了该Channel 的所有配置设置,并且支持热更新。由于Channel 是独一无二的,所以为了保证顺序将Channel 声明为java.lang.Comparable 的一个子接口。因此,如果两个不同的Channel 实例都返回了相同的散列码,那么AbstractChannel 中的compareTo()方法的实现将会抛出一个Error。 **最重要****Channel** **的方法** * eventLoop:返回分配给Channel 的EventLoop * pipeline:返回分配给Channel 的ChannelPipeline * isActive :如果Channel 是活动的,则返回true。活动的意义可能依赖于底层的传输。例如,一个Socket 传输一旦连接到了远程节点便是活动的,而一个Datagram 传输一旦被打开便是活动的。 * localAddress: 返回本地的SokcetAddress * remoteAddress: 返回远程的SocketAddress * write: 将数据写到远程节点。这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷 * flush: 将之前已写的数据冲刷到底层传输,如一个Socket * writeAndFlush: 一个简便的方法,等同于调用write()并接着调用flush() ### **2、EventLoop和EventLoopGroup** ### EventLoop 定义了Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。io.netty.util.concurrent 包构建在JDK 的java.util.concurrent 包上。一个EventLoop 将由一个永远都不会改变的Thread 驱动,同时任务(Runnable 或者Callable)可以直接提交给EventLoop 实现,以立即执行或者调度执行,可以看作是一个线程。 根据配置和可用核心的不同,可能会创建多个EventLoop 实例用以优化资源的使用,并且单个EventLoop 可能会被指派用于服务多个Channel。Netty的EventLoop在继承了ScheduledExecutorService的同时,只定义了一个方法,parent()。在Netty 4 中,所有的I/O操作和事件都由已经被分配给了EventLoop的那个Thread来处理。 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 1][] **任务调度** 偶尔,你将需要调度一个任务以便稍后(延迟)执行或者周期性地执行。例如,你可能想要注册一个在客户端已经连接了5 分钟之后触发的任务。一个常见的用例是,发送心跳消息到远程节点,以检查连接是否仍然还活着。如果没有响应,你便知道可以关闭该Channel 了。 在内部,当提交任务时,**(**当前)调用线程正是支撑EventLoop 的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 2][] **异步传输** 异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个Channel 所共享。这使得可以通过尽可能少量的Thread 来支撑大量的Channel,而不是每个Channel 分配一个Thread。EventLoopGroup 负责为每个新创建的Channel 分配一个EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的EventLoop可能会被分配给多个Channel。 一旦一个Channel 被分配给一个EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的Thread)。请牢记这一点,因为它可以使你从担忧你的Channel-Handler 实现中的线程安全和同步问题中解脱出来。 另外,需要注意的是,EventLoop 的分配方式对ThreadLocal 的使用的影响。因为一个EventLoop 通常会被用于支撑多个Channel,所以对于所有相关联的Channel 来说,ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然可以被用于在多个Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。 ### **3、ChannelFuture 接口** ### Netty 中所有的I/O 操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture 接口,其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。可以将ChannelFuture 看作是将来要执行的操作的结果的占位符。它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确地预测,但是可以肯定的是它将会被执行。 # **ChannelHandler、ChannelPipeline和ChannelHandlerContext** # ### **1、ChannelHandler** ### 应用程序开发人员的角度来看,Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的地方。Netty 以适配器类的形式提供了大量默认的ChannelHandler 实现,帮我们简化应用程序处理逻辑的开发过程。 ChannelHandler 的方法是由网络事件触发的。事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,例如各种编解码,或者处理转换过程中所抛出的异常。 Netty 定义了下面两个重要的ChannelHandler 子接口: * ChannelInboundHandler——处理入站数据以及各种状态变化; * ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。 ### **2、ChannelInboundHandler 接口** ### 下面列出了interface ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel 状态发生改变时被调用。正如我们前面所提到的,这些方法和Channel 的生命周期密切相关。 * **channelRegistered** 当Channel 已经注册到它的EventLoop 并且能够处理I/O 时被调用 * **channelUnregistered** 当Channel 从它的EventLoop 注销并且无法处理任何I/O 时被调用 * **channelActive** 当Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪 * **channelInactive** 当Channel 离开活动状态并且不再连接它的远程节点时被调用 * **channelReadComplete** 当Channel上的一个读操作完成时被调用 * **channelRead** 当从Channel 读取数据时被调用 * **ChannelWritability-Changed** 当Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel 变为再次可写时恢复写入。可以通过调用Channel 的isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWater-Mark()方法来设置 * **userEventTriggered** 当ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用。 当某个ChannelInboundHandler 的实现重写channelRead()方法时,它要负责显式地释放与池化的ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法ReferenceCount-Util.release()。Netty 将使用WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用Simple-ChannelInboundHandler,SimpleChannelInboundHandler 会自动释放资源。 ### **3、ChannelOutboundHandler 接口** ### 出站操作和数据将由ChannelOutboundHandler 处理。它的方法将被Channel、Channel- Pipeline 以及ChannelHandlerContext 调用。 所有由ChannelOutboundHandler 本身所定义的方法: * **bind(ChannelHandlerContext,SocketAddress,ChannelPromise)** 当请求将Channel 绑定到本地地址时被调用 * **connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)** 当请求将Channel 连接到远程节点时被调用 * **disconnect(ChannelHandlerContext,ChannelPromise)** 当请求将Channel 从远程节点断开时被调用 * **close(ChannelHandlerContext,ChannelPromise)** 当请求关闭Channel 时被调用 * **deregister(ChannelHandlerContext,ChannelPromise)** 当请求将Channel 从它的EventLoop 注销时被调用 * **read(ChannelHandlerContext)** 当请求从Channel 读取更多的数据时被调用 * **flush(ChannelHandlerContext)** 当请求通过Channel 将入队数据冲刷到远程节点时被调用 * **write(ChannelHandlerContext,Object,ChannelPromise)**** **当请求通过Channel 将数据写到远程节点时被调用 ### **4、ChannelHandler的适配器** ### 有一些适配器类可以将编写自定义的ChannelHandler 所需要的努力降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。因为你有时会忽略那些不感兴趣的事件,所以Netty提供了抽象基类ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter。 你可以使用ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler 的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler 的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler 的方法。 ChannelHandlerAdapter 还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline。 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 3][] ### **5、ChannelPipeline 接口** ### 当Channel 被创建时,它会被自动地分配到它专属的ChannelPipeline。每一个新创建的Channel 都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个ChannelPipeline,也不能分离其当前的。在Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。 使得事件流经ChannelPipeline 是ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个ChannelHandler。它们的执行顺序是由它们被添加的顺序所决定的。 入站和出站ChannelHandler 可以被安装到同一个ChannelPipeline中。如果一个消息或者任何其他的入站事件被读取,那么它会从ChannelPipeline 的头部开始流动,最终,数据将会到达ChannelPipeline 的尾端,届时,所有处理就都结束了。 数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从ChannelOutboundHandler 链的尾端开始流动,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层,这里显示为Socket。通常情况下,这将触发一个写操作。 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 4][] 如果将两个类别的ChannelHandler都混合添加到同一个ChannelPipeline 中会发生什么。虽然ChannelInboundHandle 和ChannelOutboundHandle 都扩展自ChannelHandler,但是Netty 能区分ChannelInboundHandler实现和ChannelOutboundHandler 实现,并确保数据只会在具有相同定向类型的两个ChannelHandler 之间传递。 **ChannelPipeline上的方法** * **a****ddFirst****、****addBefore****、****addAfter****、****addLast**:将一个ChannelHandler 添加到ChannelPipeline 中 * **remove** 将一个ChannelHandler 从ChannelPipeline 中移除 * **replace** 将ChannelPipeline 中的一个ChannelHandler 替换为另一个ChannelHandler * **get** 通过类型或者名称返回ChannelHandler * **context** 返回和ChannelHandler 绑定的ChannelHandlerContext * **names** 返回ChannelPipeline 中所有ChannelHandler 的名称 ChannelPipeline 的API 公开了用于调用入站和出站操作的附加方法。 ### **6、ChannelHandlerContext** ### 通过使用作为参数传递到每个方法的**ChannelHandlerContext**,事件可以被传递给当前ChannelHandler 链中的下一个ChannelHandler。虽然这个对象可以被用于获取底层的Channel,但是它主要还是被用于写出站数据。 ChannelHandlerContext 代表了ChannelHandler 和ChannelPipeline 之间的关联,每当有ChannelHandler 添加到ChannelPipeline 中时,都会创建ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler 之间的交互。 ChannelHandlerContext 有很多的方法,其中一些方法也存在于Channel和Channel-Pipeline本身上,**但是有一点重要的不同。**如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的ChannelHandler。 **ChannelHandlerContext** **的****API** * **alloc** 返回和这个实例相关联的Channel 所配置的ByteBufAllocator * **bind** 绑定到给定的SocketAddress,并返回ChannelFuture * **channel** 返回绑定到这个实例的Channel * **close** 关闭Channel,并返回ChannelFuture * **connect** 连接给定的SocketAddress,并返回ChannelFuture * **deregister** 从之前分配的EventExecutor 注销,并返回ChannelFuture * **disconnect** 从远程节点断开,并返回ChannelFuture * **executor** 返回调度事件的EventExecutor * **fireChannelActive** 触发对下一个ChannelInboundHandler 上的channelActive()方法(已连接)的调用 * **fireChannelInactive** 触发对下一个ChannelInboundHandler 上的channelInactive()方法(已关闭)的调用 * **fireChannelRead** 触发对下一个ChannelInboundHandler 上的channelRead()方法(已接收的消息)的调用 * **fireChannelReadComplete** 触发对下一个ChannelInboundHandler 上的channelReadComplete()方法的调用 * **fireChannelRegistered** 触发对下一个ChannelInboundHandler 上的fireChannelRegistered()方法的调用 * **fireChannelUnregistered** 触发对下一个ChannelInboundHandler 上的fireChannelUnregistered()方法的调用 * **fireChannelWritabilityChanged** 触发对下一个ChannelInboundHandler 上的fireChannelWritabilityChanged()方法的调用 * **fireExceptionCaught** 触发对下一个ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用 * **fireUserEventTriggered** 触发对下一个ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用 * **handler** 返回绑定到这个实例的ChannelHandler * **isRemoved** 如果所关联的ChannelHandler 已经被从ChannelPipeline中移除则返回true * **name** 返回这个实例的唯一名称 * **pipeline** 返回这个实例所关联的ChannelPipeline * **read** 将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被读取完成后)通知ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法 **当使用ChannelHandlerContext 的API 的时候,有以下两点:** 1. ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的; 2. 如同我们在本节开头所解释的一样,相对于其他类的同名方法,ChannelHandler Context的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70]: /images/20220210/f30f2ede58994d2ebcbc91757a516f00.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 1]: /images/20220210/7b81f5132b654503a49b4876beec97ad.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 2]: /images/20220210/b77007ab0f8342559ca225b4d299efa3.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 3]: /images/20220210/af13c37cc79c4ce1a3fd067775eb71d6.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 4]: /images/20220210/2a5cdf31d4aa4ace989ed9f73c8dded3.png
还没有评论,来说两句吧...