Netty原理:ByteBuf对Nio bytebuffer做了什么导致效率提升?

x33g5p2x  于2021-11-24 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(437)

ByteBuf

NIO中ByteBuffer的缺点:

A 长度固定,无法动态的扩容和缩容,缺乏灵活性
B 使用一个position记录读写的索引位置,在读写模式切换时需手动调用flip方法,增加了使用的复杂度。
C 功能有限,使用过程中往往需要自行封装

1)分类

按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区direct buffer、复合内存缓冲区composite buffer。

A heap buffer

将数据存储到JVM的堆空间中,实际使用字节数组byte[]来存放。
优点:数据可以快速的创建和释放,并且能够直接访问内部数组
缺点:在读写数据时,需要将数据复制到直接缓冲区 再进行网络传输。

B direct buffer

不在堆中,而是使用了操作系统的本地内存。
优点:在使用Socket进行数据传输过程中,减少一次拷贝,性能更高。
缺点:释放和分配的空间更昂贵,使用时需要更谨慎。

C composite buffer

将两个或多个不同内存的缓冲区合并
优点:可以统一进行操作

应用场景:在通信线程使用缓冲区时,往往使用direct buffer,而业务消息使用缓冲区时,往往使用heap buffer,在解决http包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用
通过迭代器遍历就可以看到类型

  1. public static void main(String[] args) {
  2. // 堆内存 其他的方式
  3. ByteBuf byteBuf = Unpooled.buffer();
  4. //直接内存缓冲区
  5. ByteBuf dBuf = Unpooled.directBuffer();
  6. //这个类型是复合缓冲区 堆内存 和 直接内存 两个缓冲区可以作为参数复合起来
  7. CompositeByteBuf csbuf = Unpooled.compositeBuffer();
  8. csbuf.addComponents(byteBuf, dBuf);
  9. // 这个缓冲区可以用迭代器遍历
  10. Iterator<ByteBuf> iterator = csbuf.iterator();
  11. while (iterator.hasNext()) {
  12. System.out.println(iterator.next());
  13. }
  14. }

可以看到 这里遍历出来的就是 我们先后放进去的 堆内存缓冲区和直接内存缓冲区
对于直接内存缓冲区对于空间和释放相对复杂,速度也比堆内存稍微慢一些,如何有跟好的办法去解决这个问题呢?

D 池化的概念

对于内存空间分配和释放的复杂度和效率,netty通过内存池的方式来解决。
内存池,可以循环利用ByteBuf,提高使用率。但是管理和维护较复杂。

Unpooled正是非池化缓冲区的工具类。

主要区别在于,池化的内存由netty管理,非池化的内存由GC回收。

E 回收方式

回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用。
当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为0时,对象可以回收。

弊端:可能引发内存泄漏。
当对象不可达,JVM会通过GC回收掉,但此时引用计数可能不为0,对象无法归还内存池,会导致内存泄漏。netty只能通过对内存缓冲区进行采样,来检查。
GC回收方式的代码效果

  1. public static void main(String[] args) {
  2. ByteBuf buf = Unpooled.buffer(10);
  3. System.out.println(buf);
  4. //引用计数的值
  5. System.out.println(buf.refCnt());
  6. //保持的意思 将计数 +1
  7. buf.retain();
  8. System.out.println(buf.refCnt());
  9. //释放的意思 计数-1
  10. buf.release();
  11. System.out.println(buf.refCnt());
  12. }

2)工作原理

和ByteBuffer不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做readerIndex,写指针叫做writerIndex。

A 读写分离

当执行clear()方法时,索引位置清空回初始位置,但数据保持不变。

mark和reset方法在ByteBuf中同样适用,如markReaderIndex和resetReaderIndex。
用代码来验证效果

先查看一下初始值 以下代码都在main方法中。

  1. public static void main(String[] args) {
  2. ByteBuf buf = Unpooled.buffer();
  3. //默认的初始化是 256
  4. System.out.println(buf.capacity());
  5. //查看读写索引
  6. System.out.println(buf.readerIndex());
  7. System.out.println(buf.writerIndex());
  8. //可写大小
  9. System.out.println(buf.writableBytes());
  10. }

不同的参数都有不同的方法来展示
接下来写入数据

  1. buf.writeBytes("hello index".getBytes());
  2. //默认的初始化是 256
  3. System.out.println(buf.capacity());
  4. //查看读写索引
  5. System.out.println(buf.readerIndex());
  6. System.out.println(buf.writerIndex());
  7. //可写大小
  8. System.out.println(buf.writableBytes());

读取数据

  1. System.out.println("--------只读取5个字节 hello");
  2. for (int i = 0; i < 5; i++) {
  3. System.out.print((char) buf.readByte());
  4. }
  5. System.out.println();
  6. //默认的初始化是 256
  7. System.out.println(buf.capacity());
  8. //查看读写索引
  9. System.out.println(buf.readerIndex());
  10. System.out.println(buf.writerIndex());
  11. //可写大小
  12. System.out.println("可写入区域:" + buf.writableBytes());
  13. System.out.println("可读写的字节:" + buf.readableBytes());

可回收区域代码验证

读取完之后我们读取过的那五个字节就变成了可回收区域,我们可以调用方法来回收

回收之后方法 可写位置会变成 当前大小+回收大小

  1. //回收可废弃空间
  2. buf.discardReadBytes();
  3. System.out.println("-------回收废弃空间");
  4. //默认的初始化是 256
  5. System.out.println(buf.capacity());
  6. //查看读写索引
  7. System.out.println(buf.readerIndex());
  8. System.out.println(buf.writerIndex());
  9. //可写大小
  10. System.out.println("可写入区域:" + buf.writableBytes());
  11. System.out.println("可读写的字节:" + buf.readableBytes());

可以看到 我们读索引归零 可写入区域变成了 原有加上回收的可废弃区域

标记回滚 同之前的charbuffer是一样的 只不过这里是两个索引

  1. System.out.println("--------读取index 并且回退");
  2. //这里读写索引都适用
  3. buf.markReaderIndex();
  4. //buf.markWriterIndex();
  5. int end = buf.writerIndex();
  6. for (int i = buf.readerIndex(); i < end; i++) {
  7. System.out.print((char) buf.readByte());
  8. }
  9. System.out.println();
  10. //撤回到mark地方
  11. buf.resetReaderIndex();
  12. //默认的初始化是 256
  13. System.out.println(buf.capacity());
  14. //查看读写索引
  15. System.out.println(buf.readerIndex());
  16. System.out.println(buf.writerIndex());
  17. //可写大小
  18. System.out.println("可写入区域:" + buf.writableBytes());
  19. System.out.println("可读写的字节:" + buf.readableBytes());

B 深浅拷贝

浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响。
浅拷贝 代码验证实现

  1. 浅拷贝方法 证明是引用
  2. 切片拷贝 只可读不可写 区间是readerindex - writerindex
  1. public static void main(String[] args) {
  2. ByteBuf buf = Unpooled.buffer();
  3. buf.writeBytes("hello bytebuf copy".getBytes());
  4. System.out.println("capacity:" + buf.capacity());
  5. //查看读写索引
  6. System.out.println("readerIndex:" + buf.readerIndex());
  7. System.out.println("writerIndex:" + buf.writerIndex());
  8. //可写大小
  9. System.out.println("可写入区域:" + buf.writableBytes());
  10. System.out.println("可读写的字节:" + buf.readableBytes());
  11. //复制 浅拷贝
  12. ByteBuf newbuf = buf.duplicate();
  13. System.out.println("-------------duplicate newbuf");
  14. System.out.println("capacity:" + newbuf.capacity());
  15. //查看读写索引
  16. System.out.println("readerIndex:" + newbuf.readerIndex());
  17. System.out.println("writerIndex:" + newbuf.writerIndex());
  18. //可写大小
  19. System.out.println("可写入区域:" + newbuf.writableBytes());
  20. System.out.println("可读写的字节:" + newbuf.readableBytes());
  21. //写入新的数据 在新的buf中
  22. System.out.println("-------------duplicate newbuf add data");
  23. newbuf.writeBytes("from newbuf".getBytes());
  24. //之后读取两个 buf 查看不同
  25. //读取大小超过写 索引会报错 我们需要设置一下
  26. buf.writerIndex(30);
  27. for (int i = 0; i < 13; i++) {
  28. System.out.print((char) buf.readByte());
  29. }
  30. System.out.println();
  31. System.out.println("capacity:" + buf.capacity());
  32. //查看读写索引
  33. System.out.println("readerIndex:" + buf.readerIndex());
  34. System.out.println("writerIndex:" + buf.writerIndex());
  35. //可写大小
  36. System.out.println("可写入区域:" + buf.writableBytes());
  37. System.out.println("可读写的字节:" + buf.readableBytes());
  38. //有时候 我们的只需要拿到最重要的部分 未读取部分 netty同时也给我们准备了方法
  39. //slice() 部分浅拷贝 拷贝区间 readerindex - writerindex之间的区域
  40. //这部分 只可读 不可写 切片的容量 就是原可读区域的大小
  41. ByteBuf sliceBuf = buf.slice();
  42. //写入数据 会导致 异常
  43. System.out.println("---------sliceBuf");
  44. System.out.println("capacity:" + sliceBuf.capacity());
  45. //查看读写索引
  46. System.out.println("readerIndex:" + sliceBuf.readerIndex());
  47. System.out.println("writerIndex:" + sliceBuf.writerIndex());
  48. //可写大小
  49. System.out.println("可写入区域:" + sliceBuf.writableBytes());
  50. System.out.println("可读写的字节:" + sliceBuf.readableBytes());
  51. }

效果输出 分析

深拷贝,拷贝的是整个对象,和原对象之间完全独立。

  1. // 深复制
  2. ByteBuf copyBuf = buf.copy();
  3. System.out.println("------copyBuf");
  4. System.out.println("capacity:" + copyBuf.capacity());
  5. //查看读写索引
  6. System.out.println("readerIndex:" + copyBuf.readerIndex());
  7. System.out.println("writerIndex:" + copyBuf.writerIndex());
  8. //可写大小
  9. System.out.println("可写入区域:" + copyBuf.writableBytes());
  10. System.out.println("可读写的字节:" + copyBuf.readableBytes());
  11. System.out.println("------- add data copybuf");
  12. copyBuf.writeBytes("from copyBuf".getBytes());
  13. copyBuf.writerIndex(43);
  14. for (int i = copyBuf.readerIndex(); i < 43; i++) {
  15. System.out.print((char) copyBuf.readByte());
  16. }
  17. System.out.println();
  18. System.out.println("----------原 buf");
  19. buf.writerIndex(43);
  20. for (int i = buf.readerIndex(); i < 43; i++) {
  21. System.out.print((char) buf.readByte());
  22. }
  23. System.out.println();

分析结果

小结

duplicate和slice方法,达成全部浅拷贝和部分浅拷贝。
copy,部分深拷贝,部分代表的是可读空间。

3)扩容机制

A ByteBuffer的存储

ByteBuffer在put数据时,会校验剩余空间是否不足,如果不足,会抛出异常。

  1. ByteBuffer buffer = ByteBuffer.allocate(8);
  2. buffer.put("yu".getBytes());
  3. ----------------------------------------------------
  4. public final ByteBuffer put(byte[] src) {
  5. return put(src, 0, src.length);
  6. }
  7. // 额外接收偏移量(存储数据的起始位置) 和数据长度
  8. public ByteBuffer put(byte[] src, int offset, int length) {
  9. // 校验参数的有效性
  10. checkBounds(offset, length, src.length);
  11. // 如果要存储数据的长度 > 剩余可用空间 抛出buffer越界的异常
  12. if (length > remaining())
  13. throw new BufferOverflowException();
  14. // 如果剩余空间足够 计算存储的结束位置 = 偏移量 + 数据长度
  15. int end = offset + length;
  16. for (int i = offset; i < end; i++)
  17. this.put(src[i]);
  18. return this;
  19. }

如果要手动对ByteBuffer扩容,可以在put之前,先校验剩余空间是否足够,如果不足够,创建一个新的ByteBuffer,新的容量确保足够,旧的buffer数据拷贝到新的buffer中,然后继续存储数据。

B ByteBuf的存储和扩容

当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以64作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加4M,这种方式叫做"步进式"。
验证扩容

  1. public static void main(String[] args) {
  2. ByteBuf buf = Unpooled.buffer(10);
  3. System.out.println("capacity:" + buf.capacity());
  4. for (int i = 0; i < 11; i++) {
  5. buf.writeByte(i);
  6. }
  7. System.out.println("capacity:" + buf.capacity());
  8. for (int i = 0; i < 65; i++) {
  9. buf.writeByte(i);
  10. }
  11. System.out.println("capacity:" + buf.capacity());
  12. }

输出结果 可以看到 扩容的范围变化

  1. capacity:10
  2. capacity:64
  3. capacity:128
  1. 查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现。
  2. ByteBuf buf = Unpooled.buffer(10);
  3. System.out.println("capacity: " + buf.capacity());
  4. for (int i = 0; i < 11; i++) {
  5. buf.writeByte(i);
  6. }
  7. ----------------------------------------------------
  8. [ByteBuf类]
  9. public abstract ByteBuf writeByte(int value);
  10. 按住Ctrl+Alt快捷键
  11. [AbstractByteBuf子类]
  12. ----------------------------------------------------
  13. @Override
  14. public ByteBuf writeByte(int value) {
  15. // 确保可写空间足够
  16. ensureWritable0(1);
  17. // 写入数据
  18. _setByte(writerIndex++, value);
  19. return this;
  20. }
  21. // 参数为 最小写入数据的大小
  22. final void ensureWritable0(int minWritableBytes) {
  23. final int writerIndex = writerIndex();
  24. // 目标容量 = 当前写操作索引 + 最小写入数据大小
  25. final int targetCapacity = writerIndex + minWritableBytes;
  26. // 容量足够 不需扩容
  27. if (targetCapacity <= capacity()) {
  28. ensureAccessible();
  29. return;
  30. }
  31. // 容量不足时 如果目标容量 超出最大容量 抛出异常
  32. if (checkBounds && targetCapacity > maxCapacity) {
  33. ensureAccessible();
  34. throw new IndexOutOfBoundsException(String.format(
  35. "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
  36. writerIndex, minWritableBytes, maxCapacity, this));
  37. }
  38. // 扩容逻辑
  39. // 获取可写空间大小
  40. final int fastWritable = maxFastWritableBytes();
  41. // 如果 可写空间 >= 所需空间 新的容量=写操作索引+可写空间大小
  42. // 如果 可写空间 < 所需空间 计算要扩容的新容量大小 calculateNewCapacity方法
  43. int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
  44. : alloc().calculateNewCapacity(targetCapacity, maxCapacity);
  45. // Adjust to the new capacity.
  46. // 计算完成后 生成新的ByteBuffer
  47. capacity(newCapacity);
  48. }
  49. // 获取可写空间大小
  50. public int maxFastWritableBytes() {
  51. return writableBytes();
  52. }
  53. [AbstractByteBufAllocator子类]
  54. ----------------------------------------------------
  55. // 计算要扩容的新容量大小
  56. @Override
  57. public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
  58. // 校验参数有效性
  59. checkPositiveOrZero(minNewCapacity, "minNewCapacity");
  60. if (minNewCapacity > maxCapacity) {
  61. throw new IllegalArgumentException(String.format(
  62. "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
  63. minNewCapacity, maxCapacity));
  64. }
  65. // 扩容方式的分界点 以4M大小为界
  66. final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
  67. if (minNewCapacity == threshold) {
  68. return threshold;
  69. }
  70. // If over threshold, do not double but just increase by threshold.、
  71. // 如果所需容量大于4M 按照步进的方式扩容
  72. // 举例: 比如 minNewCapacity = 5M
  73. if (minNewCapacity > threshold) {
  74. // newCapacity = 5 / 4 * 4 = 4M 确保是4的倍数
  75. int newCapacity = minNewCapacity / threshold * threshold;
  76. if (newCapacity > maxCapacity - threshold) {
  77. newCapacity = maxCapacity;
  78. } else {
  79. // newCapacity = 4 + 4 = 8M;
  80. newCapacity += threshold;
  81. }
  82. return newCapacity;
  83. }
  84. // Not over threshold. Double up to 4 MiB, starting from 64.
  85. // 如果所需容量大于4M 按照64的倍数扩容 找到最接近所需容量的64的倍数
  86. int newCapacity = 64;
  87. while (newCapacity < minNewCapacity) {
  88. newCapacity <<= 1;
  89. }
  90. // 保障在最大可接受容量范围内
  91. return Math.min(newCapacity, maxCapacity);
  92. }

4)优势

A 池化的方式提高内存使用率

B 提出了复合型缓冲区的整合方案

C 增加了索引,使读写分离,使用更便捷

D 解决了ByteBuffer长度固定的问题,增加了扩容机制

E 用引用计数的方式进行对象回收

相关文章