Netty源码阅读之ByteBuf简析(二)

Dear 丶 2023-03-13 03:27 143阅读 0赞

前面对于Netty中的ByteBuf缓存做了一下简单的介绍,下面将围绕源码,对几方面展开分析。

1.扩容机制

首先来聊下Bytebuf的扩容机制,这个机制也是Netty比较强悍的地方,而我们知道JDK自带的ByteBuffer是无法扩容的,这是其在设计中一个比较反人类的点,导致很多程序员在使用中无法接受;而在Bytebuf创建的时候,我们通常会设置一个初始容量,在我们写数据的时候,如果写指针超过了设置的初始容量,那么Bytebuf将进行扩容,废话不多说,下面我们直接通过源码来一探究竟吧:

  1. public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
  2. ensureAccessible();
  3. ensureWritable(length);
  4. setBytes(writerIndex, src, srcIndex, length);
  5. writerIndex += length;
  6. return this;
  7. }

在写数据的时候,首先会校验当前的对象是否已经被释放,具体的做法就是查看一下引用计数的标志位是否为0,如果为0,那么直接抛出异常。

  1. protected final void ensureAccessible() {
  2. if (checkAccessible && refCnt() == 0) {
  3. throw new IllegalReferenceCountException(0);
  4. }
  5. }

随后是校验一下是否可写,具体的逻辑如下:

  1. private void ensureWritable0(int minWritableBytes) {
  2. if (minWritableBytes <= writableBytes()) {
  3. return;
  4. }
  5. if (minWritableBytes > maxCapacity - writerIndex) {
  6. throw new IndexOutOfBoundsException(String.format(
  7. "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
  8. writerIndex, minWritableBytes, maxCapacity, this));
  9. }
  10. // Normalize the current capacity to the power of 2.
  11. int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
  12. // Adjust to the new capacity.
  13. capacity(newCapacity);
  14. }

开始主要是做一些边界检查,下面主要看下分配新容量的代码逻辑:

  1. public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
  2. if (minNewCapacity < 0) {
  3. throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
  4. }
  5. if (minNewCapacity > maxCapacity) {
  6. throw new IllegalArgumentException(String.format(
  7. "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
  8. minNewCapacity, maxCapacity));
  9. }
  10. final int threshold = 1048576 * 4; // 4 MiB page
  11. if (minNewCapacity == threshold) {
  12. return threshold;
  13. }
  14. // If over threshold, do not double but just increase by threshold.
  15. if (minNewCapacity > threshold) {
  16. int newCapacity = minNewCapacity / threshold * threshold;
  17. if (newCapacity > maxCapacity - threshold) {
  18. newCapacity = maxCapacity;
  19. } else {
  20. newCapacity += threshold;
  21. }
  22. return newCapacity;
  23. }
  24. // Not over threshold. Double up to 4 MiB, starting from 64.
  25. int newCapacity = 64;
  26. while (newCapacity < minNewCapacity) {
  27. newCapacity <<= 1;
  28. }
  29. return Math.min(newCapacity, maxCapacity);
  30. }

扩容的时候,如果当前的容量小于4MB,那么新的容量从64字节大小开始,一直翻倍,直到超过期望的容量,如果期望的新容量已经超过4MB,那么由期望的容量计算应该扩大为4MB的多少倍,最后返回应该扩容的容量大小。
上边分析了应该扩容的容量大小,下面再来看下具体的扩容函数(这是一个抽象方法,这里以PooledBytebuf这个类中的实现为例):

  1. public final ByteBuf capacity(int newCapacity) {
  2. ensureAccessible();
  3. // If the request capacity does not require reallocation, just update the length of the memory.
  4. if (chunk.unpooled) {
  5. if (newCapacity == length) {
  6. return this;
  7. }
  8. } else {
  9. if (newCapacity > length) {
  10. if (newCapacity <= maxLength) {
  11. length = newCapacity;
  12. return this;
  13. }
  14. } else if (newCapacity < length) {
  15. if (newCapacity > maxLength >>> 1) {
  16. if (maxLength <= 512) {
  17. if (newCapacity > maxLength - 16) {
  18. length = newCapacity;
  19. setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
  20. return this;
  21. }
  22. } else { // > 512 (i.e. >= 1024)
  23. length = newCapacity;
  24. setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
  25. return this;
  26. }
  27. }
  28. } else {
  29. return this;
  30. }
  31. }
  32. // Reallocation required.
  33. chunk.arena.reallocate(this, newCapacity, true);
  34. return this;
  35. }

由上边的代码可以知道,如果当前的缓存为非池化的缓存,那么直接申请内存,待释放的时候再进行释放,具体做法是直接增加当前内存的容量长度,但假如当前的缓存为池化的缓存,那么我们将直接申请一块大内存,重新分配大内存的逻辑如下:

  1. void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
  2. if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
  3. throw new IllegalArgumentException("newCapacity: " + newCapacity);
  4. }
  5. int oldCapacity = buf.length;
  6. if (oldCapacity == newCapacity) {
  7. return;
  8. }
  9. PoolChunk<T> oldChunk = buf.chunk;
  10. long oldHandle = buf.handle;
  11. T oldMemory = buf.memory;
  12. int oldOffset = buf.offset;
  13. int oldMaxLength = buf.maxLength;
  14. int readerIndex = buf.readerIndex();
  15. int writerIndex = buf.writerIndex();
  16. allocate(parent.threadCache(), buf, newCapacity);
  17. if (newCapacity > oldCapacity) {
  18. memoryCopy(
  19. oldMemory, oldOffset,
  20. buf.memory, buf.offset, oldCapacity);
  21. } else if (newCapacity < oldCapacity) {
  22. if (readerIndex < newCapacity) {
  23. if (writerIndex > newCapacity) {
  24. writerIndex = newCapacity;
  25. }
  26. memoryCopy(
  27. oldMemory, oldOffset + readerIndex,
  28. buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
  29. } else {
  30. readerIndex = writerIndex = newCapacity;
  31. }
  32. }
  33. buf.setIndex(readerIndex, writerIndex);
  34. if (freeOldMemory) {
  35. free(oldChunk, oldHandle, oldMaxLength, buf.cache);
  36. }
  37. }

主要的逻辑是将原来旧内存中的内容写到新的内存中,最后将旧的内存释放掉。

2.ByteBuf中的内存规格简介

首先,我们需要了解在Netty向操作系统申请内存的时候是以“chunk”为单位进行申请的,内存规格主要是分为4个区间,每一个区间,其对应的内存分配方式各异,下面来了解一下内存区间的具体分配:

表1 内存规格表














tiny small normal huge
0~512B 512B~8K 8K~16M 16M~+∞

由表1,16M即上一段提及的”Chunk”,而8K又称为“Page”,至于(0,8K)区间段又称为“SubPage”,为了尽可能的利用内存资源,Netty内部采用了上述的规格来进行内存划分。

3.ByteBuf内存分配流程

上边简单科普了一下内存规格相关的知识,下面我们将对照源码来看下内存分配相关的内容

在开始之前我们有必要认识一下PoolThreadCache这个类:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70

这个类主要分成了cache以及arena两部分,在cache我们可以直接分配内存,而arena主要功能是向操作系统请求内存分配,也是本部分主要讲解的内容,具体PoolThreadCache的结构大致是下面这样的:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70 1

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70 2

根据上图可以画出arena的大致结构:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70 3

而chunk又由多个更小的subpage组成,这些subpage通过chunk属性来标明具体属于哪一个chunk,相互之间通过双向链表进行连接。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70 4

下面对应于具体的源码来探究下具体的内存分配逻辑:

首先是Page级别的内存划分,打开PoolArena的allocate()方法,可以看到:

  1. if (normCapacity <= chunkSize) {
  2. if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
  3. // was able to allocate out of the cache so move on
  4. return;
  5. }
  6. allocateNormal(buf, reqCapacity, normCapacity);
  7. } else {
  8. // Huge allocations are never served via the cache so just call allocateHuge
  9. allocateHuge(buf, reqCapacity);
  10. }

也就是说当小于16M的时候,是通过缓存来进行分配的,每一次分配内存都分配16M的整数倍,具体的代码逻辑如下:

  1. private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
  2. if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
  3. q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
  4. q075.allocate(buf, reqCapacity, normCapacity)) {
  5. ++allocationsNormal;
  6. return;
  7. }
  8. // Add a new chunk.
  9. PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
  10. long handle = c.allocate(normCapacity);
  11. ++allocationsNormal;
  12. assert handle > 0;
  13. c.initBuf(buf, handle, reqCapacity);
  14. qInit.add(c);
  15. }

在分配内存的时候,首先尝试在现有的chunk上进行分配,随后创建一个chunk进行分配,最后初始化PooledByteBuf。

说完page级别的内存划分下面在来探究一下subPage级别的内存划分:

  1. f (isTinyOrSmall(normCapacity)) { // capacity < pageSize
  2. int tableIdx;
  3. PoolSubpage<T>[] table;
  4. boolean tiny = isTiny(normCapacity);
  5. if (tiny) { // < 512
  6. if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
  7. // was able to allocate out of the cache so move on
  8. return;
  9. }
  10. tableIdx = tinyIdx(normCapacity);
  11. table = tinySubpagePools;
  12. } else {
  13. if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
  14. // was able to allocate out of the cache so move on
  15. return;
  16. }
  17. tableIdx = smallIdx(normCapacity);
  18. table = smallSubpagePools;
  19. }
  20. final PoolSubpage<T> head = table[tableIdx];
  21. /**
  22. * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
  23. * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
  24. */
  25. synchronized (head) {
  26. final PoolSubpage<T> s = head.next;
  27. if (s != head) {
  28. assert s.doNotDestroy && s.elemSize == normCapacity;
  29. long handle = s.allocate();
  30. assert handle >= 0;
  31. s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
  32. if (tiny) {
  33. allocationsTiny.increment();
  34. } else {
  35. allocationsSmall.increment();
  36. }
  37. return;
  38. }
  39. }
  40. allocateNormal(buf, reqCapacity, normCapacity);
  41. return;
  42. }

对于不同粒度的内存管理,Netty是通过平衡二叉树结构来进行分配的,具体可以参考(https://segmentfault.com/a/1190000021444859)

4.ByteBuf回收机制

上面一大段,光分析了内存分配了,那么既然有分配内存,相应的也有释放内存,接下来,我们再来通过源代码了解一下Netty中的内存回收机制:

release()方法定义在ReferenceCounted这个接口中,我们找一下这个接口的实现类:AbstractReferenceCountedByteBuf,具体看下release0这个方法:

说到这里,还是必须提下Netty中的引用计数,对的,Netty也是采用了引用计数这种当时来判断当前的内存是否为“可以被回收”,

具体到代码中,是通过refCnt 这个变量来确定引用数的,当refCnt == decrement的时候,我们就可以进行内存的回收了

  1. private boolean release0(int decrement) {
  2. for (;;) {
  3. int refCnt = this.refCnt;
  4. if (refCnt < decrement) {
  5. throw new IllegalReferenceCountException(refCnt, -decrement);
  6. }
  7. if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
  8. if (refCnt == decrement) {
  9. deallocate();
  10. return true;
  11. }
  12. return false;
  13. }
  14. }
  15. }

查看PooledByteBuf中关于deallocate()的实现:

  1. @Override
  2. protected final void deallocate() {
  3. if (handle >= 0) {
  4. final long handle = this.handle;
  5. this.handle = -1;
  6. memory = null;
  7. chunk.arena.free(chunk, handle, maxLength, cache);
  8. recycle();
  9. }
  10. }

具体的步骤就是先将连续的内存段加载到缓存中,并标记连续的内存区段为未使用,随后将ByteBuf添加到对象池中(ByteBuf并不会立即销毁):

  1. void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
  2. if (chunk.unpooled) {
  3. int size = chunk.chunkSize();
  4. destroyChunk(chunk);
  5. activeBytesHuge.add(-size);
  6. deallocationsHuge.increment();
  7. } else {
  8. SizeClass sizeClass = sizeClass(normCapacity);
  9. if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
  10. // cached so not free it.
  11. return;
  12. }
  13. freeChunk(chunk, handle, sizeClass);
  14. }
  15. }

至此,对于ByteBuf的分析完毕。


慈母手中线,游子身上衣;今天是母亲节,祝愿所有伟大的母亲节日快乐!

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhbGlzaGFkYWE_size_16_color_FFFFFF_t_70 5

发表评论

表情:
评论列表 (有 0 条评论,143人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty阅读编码器

        上回主要聊了一下Netty中的解码器,那么既然有解码,也必须得聊下编码过程了,下面将对Netty中的编码器作一下总结:   1.编码器简介 作为解码的逆过程,编码

    相关 netty阅读ByteBuf

    今天我们开启新的篇章,netty很重要的内存概念将在这一章介绍。ByteBuf主要介绍以下几点: 1、内存与内存管理器的抽象 2、不同规格大小和不同类别的内存的分配策略