Netty之内存分配器ByteBufAllocator

柔光的暖阳◎ 2023-06-30 04:26 264阅读 0赞

目录

内存分配器继承体系

ByteBufAllocator

AbstractByteBufAllocator

UnpooledByteBufAllocator

newHeapBuffer

newDirectBuffer

PooledByteBufAllocator

PoolThreadLocalCache

PoolArena

arena分配内存流程


内存分配器继承体系

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTEyMTIzOTQ_size_16_color_FFFFFF_t_70


ByteBufAllocator

ByteBufAllocator是Netty内存分配最顶层的抽象,负责分配所有类型的内存。

  1. public interface ByteBufAllocator {
  2. ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
  3. // 直接分配一块内存,是使用direct还是heap取决于子类实现
  4. ByteBuf buffer();
  5. // 重载方法,增加初始容量
  6. ByteBuf buffer(int initialCapacity);
  7. // 重载方法,增加初始容量与最大容量
  8. ByteBuf buffer(int initialCapacity, int maxCapacity);
  9. // 更倾向于direct方式分配
  10. ByteBuf ioBuffer();
  11. ByteBuf ioBuffer(int initialCapacity);
  12. ByteBuf ioBuffer(int initialCapacity, int maxCapacity);
  13. // heap内存分配
  14. ByteBuf heapBuffer();
  15. ByteBuf heapBuffer(int initialCapacity);
  16. ByteBuf heapBuffer(int initialCapacity, int maxCapacity);
  17. // direct内存分配
  18. ByteBuf directBuffer();
  19. ByteBuf directBuffer(int initialCapacity);
  20. ByteBuf directBuffer(int initialCapacity, int maxCapacity);
  21. ...
  22. }

AbstractByteBufAllocator

AbstractByteBufAllocator是ByteBufAllocator的骨架实现(类似于AbstractByteBuf和ByteBuf的关系),它提供了两个重要的抽象类供子类扩展,以实现堆内和堆外内存的创建。

  1. // 创建堆内内存
  2. protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
  3. // 创建堆外内存
  4. protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);

UnpooledByteBufAllocator

UnpooledByteBufAllocator是非池化分配器,这里着重分析一下newHeapBuffer方法与newDirectBuffer方法的实现逻辑。

  1. @Override
  2. protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
  3. // netty会优先使用unsafe方式
  4. return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
  5. : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
  6. }
  7. @Override
  8. protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  9. // netty会优先使用unsafe方式
  10. ByteBuf buf = PlatformDependent.hasUnsafe() ?
  11. UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
  12. new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  13. return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
  14. }

newHeapBuffer

在newHeapBuffer方法中,会根据hasUnsafe的值来确定ByteBuf的类型是UnpooledUnsafeHeapByteBuf还是UnpooledHeapByteBuf。

UnpooledUnsafeHeapByteBuf是UnpooledHeapByteBuf的子类,子类构造函数调用父类构造函数,走了同一套实例化的逻辑,通过new的方式创建了指定容量的byte数组。

  1. final class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
  2. UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  3. super(alloc, initialCapacity, maxCapacity);
  4. }
  5. ...
  6. }
  7. public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
  8. protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  9. this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
  10. }
  11. ...
  12. private UnpooledHeapByteBuf(
  13. ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
  14. super(maxCapacity);
  15. if (alloc == null) {
  16. throw new NullPointerException("alloc");
  17. }
  18. if (initialArray == null) {
  19. throw new NullPointerException("initialArray");
  20. }
  21. if (initialArray.length > maxCapacity) {
  22. throw new IllegalArgumentException(String.format(
  23. "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
  24. }
  25. this.alloc = alloc;
  26. setArray(initialArray);
  27. setIndex(readerIndex, writerIndex);
  28. }
  29. private void setArray(byte[] initialArray) {
  30. array = initialArray;
  31. tmpNioBuf = null;
  32. }
  33. }

既然UnpooledUnsafeHeapByteBuf与UnpooledHeapByteBuf的实例化逻辑完全一样,那它们的区别在哪?

我们知道在父类AbstractByteBuf中有很多下划线开头的抽象方法,这些方法都是需要子类去实现的,两者的区别就是在_getByte方法的实现上。

UnpooledHeapByteBuf的_getByte方法是直接拿到初始化new的byte数组,通过index获取数据。

  1. @Override
  2. protected byte _getByte(int index) {
  3. return HeapByteBufUtil.getByte(array, index);
  4. }
  5. ...
  6. static byte getByte(byte[] memory, int index) {
  7. return memory[index];
  8. }

而UnpooledUnsafeHeapByteBuf的_getByte方法则是调用了unsafe对象的native API,根据数组对象与偏移量来获取数据,效率相对而言较高,这也是Netty优先选择unsafe方式的原因。

  1. @Override
  2. protected byte _getByte(int index) {
  3. return UnsafeByteBufUtil.getByte(array, index);
  4. }
  5. ...
  6. static byte getByte(byte[] array, int index) {
  7. return PlatformDependent.getByte(array, index);
  8. }
  9. ...
  10. public static byte getByte(byte[] data, int index) {
  11. return PlatformDependent0.getByte(data, index);
  12. }
  13. static byte getByte(byte[] data, int index) {
  14. return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
  15. }

newDirectBuffer

在newDirectBuffer方法中,同样会根据hasUnsafe的值来ByteBuf的类型是否是Unsafe。

首先是UnpooledDirectByteBuf,它构造函数的逻辑很简单,通过堆外内存创建ByteBuffer。

  1. protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  2. super(maxCapacity);
  3. if (alloc == null) {
  4. throw new NullPointerException("alloc");
  5. }
  6. if (initialCapacity < 0) {
  7. throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
  8. }
  9. if (maxCapacity < 0) {
  10. throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
  11. }
  12. if (initialCapacity > maxCapacity) {
  13. throw new IllegalArgumentException(String.format(
  14. "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
  15. }
  16. this.alloc = alloc;
  17. // ByteBuffer.allocateDirect(initialCapacity)通过堆外内存创建ByteBuffer
  18. setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
  19. }
  20. private void setByteBuffer(ByteBuffer buffer) {
  21. ByteBuffer oldBuffer = this.buffer;
  22. if (oldBuffer != null) {
  23. if (doNotFree) {
  24. doNotFree = false;
  25. } else {
  26. freeDirect(oldBuffer);
  27. }
  28. }
  29. this.buffer = buffer;
  30. tmpNioBuf = null;
  31. capacity = buffer.remaining();
  32. }

再看一下UnpooledUnsafeDirectByteBuf构造函数的逻辑。

  1. protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  2. super(maxCapacity);
  3. if (alloc == null) {
  4. throw new NullPointerException("alloc");
  5. }
  6. if (initialCapacity < 0) {
  7. throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
  8. }
  9. if (maxCapacity < 0) {
  10. throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
  11. }
  12. if (initialCapacity > maxCapacity) {
  13. throw new IllegalArgumentException(String.format(
  14. "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
  15. }
  16. this.alloc = alloc;
  17. setByteBuffer(allocateDirect(initialCapacity), false);
  18. }
  19. final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
  20. if (tryFree) {
  21. ByteBuffer oldBuffer = this.buffer;
  22. if (oldBuffer != null) {
  23. if (doNotFree) {
  24. doNotFree = false;
  25. } else {
  26. freeDirect(oldBuffer);
  27. }
  28. }
  29. }
  30. this.buffer = buffer;
  31. // 计算ByteBuffer的内存地址
  32. memoryAddress = PlatformDependent.directBufferAddress(buffer);
  33. tmpNioBuf = null;
  34. capacity = buffer.remaining();
  35. }

可以看到,方法整体逻辑与非unsafe方式是一样的,只不过在setByteBuffer方法中额外计算了ByteBuffer的内存地址。

再来看一下两个类对于_getByte方法的实现有什么不同:

UnpooledDirectByteBuf直接通过index在ByteBuffer中获取数据。

  1. @Override
  2. protected byte _getByte(int index) {
  3. return buffer.get(index);
  4. }

而UnpooledUnsafeDirectByteBuf则是通过前面保存的内存地址memoryAddress利用unsafe获取数据。

  1. @Override
  2. protected byte _getByte(int index) {
  3. return UnsafeByteBufUtil.getByte(addr(index));
  4. }
  5. long addr(int index) {
  6. return memoryAddress + index;
  7. }
  8. ...
  9. static byte getByte(long address) {
  10. return UNSAFE.getByte(address);
  11. }

PooledByteBufAllocator

PooledByteBufAllocator是池化分配器,首先看一下它对于newHeapBuffer方法与newDirectBuffer方法的实现逻辑。

  1. private final PoolThreadLocalCache threadCache;
  2. ...
  3. @Override
  4. protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
  5. // 线程局部缓存,在多线程情况下,每个线程有一份独立的缓存
  6. PoolThreadCache cache = threadCache.get();
  7. PoolArena<byte[]> heapArena = cache.heapArena;
  8. ByteBuf buf;
  9. if (heapArena != null) {
  10. buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
  11. } else {
  12. buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
  13. }
  14. return toLeakAwareBuffer(buf);
  15. }
  16. @Override
  17. protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  18. PoolThreadCache cache = threadCache.get();
  19. PoolArena<ByteBuffer> directArena = cache.directArena;
  20. ByteBuf buf;
  21. if (directArena != null) {
  22. buf = directArena.allocate(cache, initialCapacity, maxCapacity);
  23. } else {
  24. if (PlatformDependent.hasUnsafe()) {
  25. buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
  26. } else {
  27. buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  28. }
  29. }
  30. return toLeakAwareBuffer(buf);
  31. }

可以看到,与unpooled模式对比,pooled模式申请内存的方式不是直接去系统申请,而是先从threadCache对象获取一个线程局部缓存对象,再获取其中的PoolArena对象,由PoolArena对象来为ByteBuf分配内存。


PoolThreadLocalCache

threadCache的对象类型是PoolThreadLocalCache,它可以看做是一个增强版的ThreadLocal,这样每个线程都会有一个自己的cache,而这个cache又维护了两类内存,分别是heap(堆内)和direct(堆外)内存。


PoolArena

PoolArena可以理解为内存分配的竞技场。在PooledByteBufAllocator初始化时,会分别创建heapArenas与directArenas两个arena数组,数组大小默认是CPU核数 * 2,与NioEvenLoop的默认数量是一致的,目的是保证每个线程都有一个自己的arena,避免竞争。当有新的线程请求时,PoolThreadCache会拿到对应的PoolArena对象,保存到ThreadLocal成员变量中,这样就完成了线程与arena的绑定。


arena分配内存流程

  1. 从Recycler(对象回收池)获取PooledByteBuf进行复用。
  2. 优先从之前分配过的缓存中分配内存。
  3. 如果没有命中缓存则从内存堆中分配内存。

发表评论

表情:
评论列表 (有 0 条评论,264人围观)

还没有评论,来说两句吧...

相关阅读