Netty源码阅读之编码器简析

悠悠 2023-02-14 03:00 64阅读 0赞
  1. 上回主要聊了一下Netty中的解码器,那么既然有解码,也必须得聊下编码过程了,下面将对Netty中的编码器作一下总结:

1.编码器简介

作为解码的逆过程,编码的目的主要是将消息转换为字节或者消息,Netty中主要使用了MessageToByteEncoder这个抽象类来规定处理编码的一些流程,不妨先来看下该类的UML:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70

可以看出编码器其实也只是一类特殊的ChannelHandler,使用encode()方法来处理编码相关的逻辑,不妨先自定义一个编码器然后写一段demo:

1.1 自定义编码器实现:

  1. public class MyEncoder extends MessageToByteEncoder<Packet> {
  2. @Override
  3. protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
  4. out.writeInt(msg.getLength());
  5. out.writeBytes(new StringBuilder(msg.getIpAddress())
  6. .append(msg.getRoteName()).toString().getBytes());
  7. }
  8. }

1.2 自定义Handler:

  1. public class PacketHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. Packet packet = new Packet("route01","172.159.1.12");
  5. ctx.writeAndFlush(packet);
  6. }
  7. }
  8. public final class Server {
  9. public static void main(String[] args) throws Exception {
  10. EventLoopGroup bossGroup = new NioEventLoopGroup();
  11. EventLoopGroup workerGroup = new NioEventLoopGroup();
  12. try {
  13. ServerBootstrap b = new ServerBootstrap();
  14. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
  15. @Override
  16. public void initChannel(SocketChannel ch) {
  17. ch.pipeline().addLast(new PacketEncoder());
  18. ch.pipeline().addLast(new PacketHandler());
  19. }
  20. });
  21. ChannelFuture f = b.bind(8099).sync();
  22. f.channel().closeFuture().sync();
  23. } finally {
  24. bossGroup.shutdownGracefully();
  25. workerGroup.shutdownGracefully();
  26. }
  27. }
  28. }

1.3 对应的流程图:

20200607152012254.png

2.writeAndFlush()流程解析

  1. 在自定义Handler传输数据的时候,我们通常会调用ctx.writeAndFlush(msg)方法从tail节点开始一直往前传递到对应的解码器中(见上图),查看其源代码:
  2. public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
  3. if (msg == null) {
  4. throw new NullPointerException("msg");
  5. }
  6. if (!validatePromise(promise, true)) {
  7. ReferenceCountUtil.release(msg);
  8. // cancelled
  9. return promise;
  10. }
  11. write(msg, true, promise);
  12. return promise;
  13. }

主要关注一下write()方法:

  1. private void write(Object msg, boolean flush, ChannelPromise promise) {
  2. AbstractChannelHandlerContext next = findContextOutbound();
  3. final Object m = pipeline.touch(msg, next);
  4. EventExecutor executor = next.executor();
  5. if (executor.inEventLoop()) {
  6. if (flush) {
  7. next.invokeWriteAndFlush(m, promise);
  8. } else {
  9. next.invokeWrite(m, promise);
  10. }
  11. } else {
  12. AbstractWriteTask task;
  13. if (flush) {
  14. task = WriteAndFlushTask.newInstance(next, m, promise);
  15. } else {
  16. task = WriteTask.newInstance(next, m, promise);
  17. }
  18. safeExecute(executor, task, promise, m);
  19. }
  20. }

若flush为true,那么调用next.invokeWriteAndFlush(m, promise)方法,反之则调用next.invokeWrite(m, promise)方法,这两个方法又有什么区别呢?我们继续往下查看:

  1. private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
  2. if (invokeHandler()) {
  3. invokeWrite0(msg, promise);
  4. invokeFlush0();
  5. } else {
  6. writeAndFlush(msg, promise);
  7. }
  8. }

这边同时进行了write以及flash的操作,在看下invokeWrite()方法:

  1. private void invokeWrite(Object msg, ChannelPromise promise) {
  2. if (invokeHandler()) {
  3. invokeWrite0(msg, promise);
  4. } else {
  5. write(msg, promise);
  6. }
  7. }

这里主要缺少了flush的操作。

3.write()方法简要分析

在MessageToByteEncoder`这个类中,主要是需要关注一下write()方法,在编码器中主要是通过这个类将数据存放到对应的ByteBuf中

  1. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  2. ByteBuf buf = null;
  3. try {
  4. if (acceptOutboundMessage(msg)) {
  5. @SuppressWarnings("unchecked")
  6. I cast = (I) msg;
  7. buf = allocateBuffer(ctx, cast, preferDirect);
  8. try {
  9. encode(ctx, cast, buf);
  10. } finally {
  11. ReferenceCountUtil.release(cast);
  12. }
  13. if (buf.isReadable()) {
  14. ctx.write(buf, promise);
  15. } else {
  16. buf.release();
  17. ctx.write(Unpooled.EMPTY_BUFFER, promise);
  18. }
  19. buf = null;
  20. } else {
  21. ctx.write(msg, promise);
  22. }
  23. } catch (EncoderException e) {
  24. throw e;
  25. } catch (Throwable e) {
  26. throw new EncoderException(e);
  27. } finally {
  28. if (buf != null) {
  29. buf.release();
  30. }
  31. }
  32. }

上边的代码主要是以下的几个流程,首先调用acceptOutboundMessage()方法来匹配当前的对象,如果当前的节点能够处理,那么继续往下,反之则返回给其他的节点进行处理;若继续往下执行,则进行内存的分配,随后调用encode()方法进行编码,接下来调用ReferenceCountUtil工具进行对象的释放,再继续往下传播数据,最后进行内存的释放。

4. buffer队列机制简介

经过前面的流程分析,可知如果当前节点无法处理会一直往前传递write事件,但如果一直传递到头结点都处理不了将怎么办呢?

Netty使用了一个buffer队列的机制解决了这个问题:

来看下AbstractChannel中的write()方法:

  1. public final void write(Object msg, ChannelPromise promise) {
  2. assertEventLoop();
  3. ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
  4. if (outboundBuffer == null) {
  5. // If the outboundBuffer is null we know the channel was closed and so
  6. // need to fail the future right away. If it is not null the handling of the rest
  7. // will be done in flush0()
  8. // See https://github.com/netty/netty/issues/2362
  9. safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
  10. // release message now to prevent resource-leak
  11. ReferenceCountUtil.release(msg);
  12. return;
  13. }
  14. int size;
  15. try {
  16. msg = filterOutboundMessage(msg);
  17. size = pipeline.estimatorHandle().size(msg);
  18. if (size < 0) {
  19. size = 0;
  20. }
  21. } catch (Throwable t) {
  22. safeSetFailure(promise, t);
  23. ReferenceCountUtil.release(msg);
  24. return;
  25. }
  26. outboundBuffer.addMessage(msg, size, promise);
  27. }

找到io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage这个方法:

  1. protected final Object filterOutboundMessage(Object msg) {
  2. if (msg instanceof ByteBuf) {
  3. ByteBuf buf = (ByteBuf) msg;
  4. if (buf.isDirect()) {
  5. return msg;
  6. }
  7. return newDirectBuffer(buf);
  8. }
  9. if (msg instanceof FileRegion) {
  10. return msg;
  11. }
  12. throw new UnsupportedOperationException(
  13. "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
  14. }

通过这个方法将ByteBuf对象(即 msg参数)转换为转为堆外内存,随后通过调用outboundBuffer.addMessage()方法将对外内存插入到写缓冲队列中:

  1. public void addMessage(Object msg, int size, ChannelPromise promise) {
  2. Entry entry = Entry.newInstance(msg, size, total(msg), promise);
  3. if (tailEntry == null) {
  4. flushedEntry = null;
  5. tailEntry = entry;
  6. } else {
  7. Entry tail = tailEntry;
  8. tail.next = entry;
  9. tailEntry = entry;
  10. }
  11. if (unflushedEntry == null) {
  12. unflushedEntry = entry;
  13. }
  14. // increment pending bytes after adding message to the unflushed arrays.
  15. // See https://github.com/netty/netty/issues/1619
  16. incrementPendingOutboundBytes(size, false);
  17. }

这里维护了一个单项链表的结构,会不断的新传递过来的ByteBuf插入到链表尾部。

随后将调用incrementPendingOutboundBytes()这个函数设置写状态,其中channel.config().getWriteBufferHighWaterMark()默认被定义为64字节。

  1. private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
  2. if (size == 0) {
  3. return;
  4. }
  5. long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
  6. if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
  7. setUnwritable(invokeLater);
  8. }
  9. }

上边只是将数据更新到了缓冲队列中,但是暂时还没有通过socket传递出去,这部分的流程主要是在缓冲队列的刷新流程中体现io.netty.channel.ChannelOutboundBuffer#addFlush:

  1. private Entry flushedEntry;//标记已经flush的指针
  2. private Entry unflushedEntry;//标记未flush的指针
  3. private Entry tailEntry;//标记尾结点的指针
  4. public void addFlush() {
  5. // There is no need to process all entries if there was already a flush before and no new messages
  6. // where added in the meantime.
  7. //
  8. // See https://github.com/netty/netty/issues/2577
  9. Entry entry = unflushedEntry;
  10. if (entry != null) {
  11. if (flushedEntry == null) {
  12. // there is no flushedEntry yet, so start with the entry
  13. flushedEntry = entry;
  14. }
  15. do {
  16. flushed ++;
  17. if (!entry.promise.setUncancellable()) {
  18. // Was cancelled so make sure we free up memory and notify about the freed bytes
  19. int pending = entry.cancel();
  20. decrementPendingOutboundBytes(pending, false, true);
  21. }
  22. entry = entry.next;
  23. } while (entry != null);
  24. // All flushed so reset unflushedEntry
  25. unflushedEntry = null;
  26. }
  27. }

首先是刷新标志并设置为写状态,之后通过调用io.netty.channel.nio.AbstractNioByteChannel#doWrite()方法遍历缓冲队列,过滤ByteBuf,最后调用jdk自旋锁自旋写数据到socket中,这边的自旋次数设置为16次:

  1. private volatile int writeSpinCount = 16;

5. 总结

至此,关于Netty的编码流程分析完毕,周日快乐~

aHR0cHM6Ly9jLXNzbC5kdWl0YW5nLmNvbS91cGxvYWRzL2l0ZW0vMjAxNzEwLzE1LzIwMTcxMDE1MTM0NTQ5X3JzWXpqLmdpZg

发表评论

表情:
评论列表 (有 0 条评论,64人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty阅读编码器

        上回主要聊了一下Netty中的解码器,那么既然有解码,也必须得聊下编码过程了,下面将对Netty中的编码器作一下总结:   1.编码器简介 作为解码的逆过程,编码

    相关 netty阅读NioEventLoop

    初始阅读源码的时候,晦涩难懂,枯燥无味,一段时间之后就会觉得豁然开朗,被源码的魅力深深折服。 接下去要阅读的是netty的一个重要组件,NioEventLoop。 将会分为

    相关 netty阅读ByteBuf

    今天我们开启新的篇章,netty很重要的内存概念将在这一章介绍。ByteBuf主要介绍以下几点: 1、内存与内存管理器的抽象 2、不同规格大小和不同类别的内存的分配策略

    相关 netty阅读解码

    netty编码我们分以下几点分析: 1、抽象解码器ByteToMessageDecoder 2、基于固定长度解码器分析 3、行解码器分析 4、基于分隔符解码器分析 5