Netty原理:ByteBuf对Nio bytebuffer做了什么导致效率提升?

x33g5p2x  于2021-11-24 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(392)

ByteBuf

NIO中ByteBuffer的缺点:

A 长度固定,无法动态的扩容和缩容,缺乏灵活性
B 使用一个position记录读写的索引位置,在读写模式切换时需手动调用flip方法,增加了使用的复杂度。
C 功能有限,使用过程中往往需要自行封装

1)分类

按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区direct buffer、复合内存缓冲区composite buffer。

A heap buffer

将数据存储到JVM的堆空间中,实际使用字节数组byte[]来存放。
优点:数据可以快速的创建和释放,并且能够直接访问内部数组
缺点:在读写数据时,需要将数据复制到直接缓冲区 再进行网络传输。

B direct buffer

不在堆中,而是使用了操作系统的本地内存。
优点:在使用Socket进行数据传输过程中,减少一次拷贝,性能更高。
缺点:释放和分配的空间更昂贵,使用时需要更谨慎。

C composite buffer

将两个或多个不同内存的缓冲区合并
优点:可以统一进行操作

应用场景:在通信线程使用缓冲区时,往往使用direct buffer,而业务消息使用缓冲区时,往往使用heap buffer,在解决http包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用
通过迭代器遍历就可以看到类型

public static void main(String[] args) { 
// 堆内存 其他的方式
        ByteBuf byteBuf = Unpooled.buffer();
        //直接内存缓冲区
        ByteBuf dBuf = Unpooled.directBuffer();
        //这个类型是复合缓冲区 堆内存 和 直接内存 两个缓冲区可以作为参数复合起来
        CompositeByteBuf csbuf = Unpooled.compositeBuffer();
        csbuf.addComponents(byteBuf, dBuf);
        // 这个缓冲区可以用迭代器遍历
        Iterator<ByteBuf> iterator = csbuf.iterator();
        while (iterator.hasNext()) {
            System.out.println(iterator.next());
        }
  }

可以看到 这里遍历出来的就是 我们先后放进去的 堆内存缓冲区和直接内存缓冲区
对于直接内存缓冲区对于空间和释放相对复杂,速度也比堆内存稍微慢一些,如何有跟好的办法去解决这个问题呢?

D 池化的概念

对于内存空间分配和释放的复杂度和效率,netty通过内存池的方式来解决。
内存池,可以循环利用ByteBuf,提高使用率。但是管理和维护较复杂。

Unpooled正是非池化缓冲区的工具类。

主要区别在于,池化的内存由netty管理,非池化的内存由GC回收。

E 回收方式

回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用。
当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为0时,对象可以回收。

弊端:可能引发内存泄漏。
当对象不可达,JVM会通过GC回收掉,但此时引用计数可能不为0,对象无法归还内存池,会导致内存泄漏。netty只能通过对内存缓冲区进行采样,来检查。
GC回收方式的代码效果

public static void main(String[] args) {
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println(buf);
        //引用计数的值
        System.out.println(buf.refCnt());
        //保持的意思 将计数 +1
        buf.retain();
        System.out.println(buf.refCnt());
        //释放的意思 计数-1
        buf.release();
        System.out.println(buf.refCnt());

    }

2)工作原理

和ByteBuffer不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做readerIndex,写指针叫做writerIndex。

A 读写分离

当执行clear()方法时,索引位置清空回初始位置,但数据保持不变。

mark和reset方法在ByteBuf中同样适用,如markReaderIndex和resetReaderIndex。
用代码来验证效果

先查看一下初始值 以下代码都在main方法中。

public static void main(String[] args) {
        ByteBuf buf = Unpooled.buffer();
        //默认的初始化是 256
        System.out.println(buf.capacity());
        //查看读写索引
        System.out.println(buf.readerIndex());
        System.out.println(buf.writerIndex());
        //可写大小
        System.out.println(buf.writableBytes());
    }

不同的参数都有不同的方法来展示
接下来写入数据

buf.writeBytes("hello index".getBytes());
        //默认的初始化是 256
        System.out.println(buf.capacity());
        //查看读写索引
        System.out.println(buf.readerIndex());
        System.out.println(buf.writerIndex());
        //可写大小
        System.out.println(buf.writableBytes());

读取数据

System.out.println("--------只读取5个字节 hello");
        for (int i = 0; i < 5; i++) {
            System.out.print((char) buf.readByte());

        }
        System.out.println();
        //默认的初始化是 256
        System.out.println(buf.capacity());
        //查看读写索引
        System.out.println(buf.readerIndex());
        System.out.println(buf.writerIndex());
        //可写大小
        System.out.println("可写入区域:" + buf.writableBytes());
        System.out.println("可读写的字节:" + buf.readableBytes());

可回收区域代码验证

读取完之后我们读取过的那五个字节就变成了可回收区域,我们可以调用方法来回收

回收之后方法 可写位置会变成 当前大小+回收大小

//回收可废弃空间
        buf.discardReadBytes();
        System.out.println("-------回收废弃空间");
        //默认的初始化是 256
        System.out.println(buf.capacity());
        //查看读写索引
        System.out.println(buf.readerIndex());
        System.out.println(buf.writerIndex());
        //可写大小
        System.out.println("可写入区域:" + buf.writableBytes());
        System.out.println("可读写的字节:" + buf.readableBytes());

可以看到 我们读索引归零 可写入区域变成了 原有加上回收的可废弃区域

标记回滚 同之前的charbuffer是一样的 只不过这里是两个索引

System.out.println("--------读取index 并且回退");
        //这里读写索引都适用
        buf.markReaderIndex();
        //buf.markWriterIndex();
        int end = buf.writerIndex();
        for (int i = buf.readerIndex(); i < end; i++) {
            System.out.print((char) buf.readByte());

        }
        System.out.println();
        //撤回到mark地方
        buf.resetReaderIndex();
        //默认的初始化是 256
        System.out.println(buf.capacity());
        //查看读写索引
        System.out.println(buf.readerIndex());
        System.out.println(buf.writerIndex());
        //可写大小
        System.out.println("可写入区域:" + buf.writableBytes());
        System.out.println("可读写的字节:" + buf.readableBytes());

B 深浅拷贝

浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响。
浅拷贝 代码验证实现

  1. 浅拷贝方法 证明是引用
  2. 切片拷贝 只可读不可写 区间是readerindex - writerindex
public static void main(String[] args) {
        ByteBuf buf = Unpooled.buffer();
        buf.writeBytes("hello bytebuf copy".getBytes());
        System.out.println("capacity:" + buf.capacity());
        //查看读写索引
        System.out.println("readerIndex:" + buf.readerIndex());
        System.out.println("writerIndex:" + buf.writerIndex());
        //可写大小
        System.out.println("可写入区域:" + buf.writableBytes());
        System.out.println("可读写的字节:" + buf.readableBytes());

        //复制 浅拷贝
        ByteBuf newbuf = buf.duplicate();
        System.out.println("-------------duplicate newbuf");

        System.out.println("capacity:" + newbuf.capacity());
        //查看读写索引
        System.out.println("readerIndex:" + newbuf.readerIndex());
        System.out.println("writerIndex:" + newbuf.writerIndex());
        //可写大小
        System.out.println("可写入区域:" + newbuf.writableBytes());
        System.out.println("可读写的字节:" + newbuf.readableBytes());
        //写入新的数据 在新的buf中
        System.out.println("-------------duplicate newbuf add data");
        newbuf.writeBytes("from newbuf".getBytes());

        //之后读取两个 buf 查看不同
        //读取大小超过写 索引会报错 我们需要设置一下
        buf.writerIndex(30);
        for (int i = 0; i < 13; i++) {
            System.out.print((char) buf.readByte());
        }
        System.out.println();
        System.out.println("capacity:" + buf.capacity());
        //查看读写索引
        System.out.println("readerIndex:" + buf.readerIndex());
        System.out.println("writerIndex:" + buf.writerIndex());
        //可写大小
        System.out.println("可写入区域:" + buf.writableBytes());
        System.out.println("可读写的字节:" + buf.readableBytes());

        //有时候 我们的只需要拿到最重要的部分 未读取部分 netty同时也给我们准备了方法
        //slice() 部分浅拷贝 拷贝区间 readerindex - writerindex之间的区域
        //这部分 只可读 不可写 切片的容量 就是原可读区域的大小
        ByteBuf sliceBuf = buf.slice();
        //写入数据 会导致 异常
        System.out.println("---------sliceBuf");
        System.out.println("capacity:" + sliceBuf.capacity());
        //查看读写索引
        System.out.println("readerIndex:" + sliceBuf.readerIndex());
        System.out.println("writerIndex:" + sliceBuf.writerIndex());
        //可写大小
        System.out.println("可写入区域:" + sliceBuf.writableBytes());
        System.out.println("可读写的字节:" + sliceBuf.readableBytes());
    }

效果输出 分析

深拷贝,拷贝的是整个对象,和原对象之间完全独立。

// 深复制
ByteBuf copyBuf = buf.copy();
System.out.println("------copyBuf");
System.out.println("capacity:" + copyBuf.capacity());
//查看读写索引
System.out.println("readerIndex:" + copyBuf.readerIndex());
System.out.println("writerIndex:" + copyBuf.writerIndex());
//可写大小
System.out.println("可写入区域:" + copyBuf.writableBytes());
System.out.println("可读写的字节:" + copyBuf.readableBytes());

System.out.println("------- add data copybuf");
copyBuf.writeBytes("from copyBuf".getBytes());

copyBuf.writerIndex(43);
for (int i = copyBuf.readerIndex(); i < 43; i++) {
    System.out.print((char) copyBuf.readByte());
}
System.out.println();
System.out.println("----------原 buf");
buf.writerIndex(43);
for (int i = buf.readerIndex(); i < 43; i++) {
    System.out.print((char) buf.readByte());
}
System.out.println();

分析结果

小结

duplicate和slice方法,达成全部浅拷贝和部分浅拷贝。
copy,部分深拷贝,部分代表的是可读空间。

3)扩容机制

A ByteBuffer的存储

ByteBuffer在put数据时,会校验剩余空间是否不足,如果不足,会抛出异常。

ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put("yu".getBytes());

----------------------------------------------------

    public final ByteBuffer put(byte[] src) {
        return put(src, 0, src.length);
    }
    
    // 额外接收偏移量(存储数据的起始位置)  和数据长度
    public ByteBuffer put(byte[] src, int offset, int length) {
        // 校验参数的有效性
        checkBounds(offset, length, src.length);
        // 如果要存储数据的长度 > 剩余可用空间  抛出buffer越界的异常
        if (length > remaining())
            throw new BufferOverflowException();
        // 如果剩余空间足够  计算存储的结束位置 = 偏移量 + 数据长度    
        int end = offset + length;
        for (int i = offset; i < end; i++)
            this.put(src[i]);
        return this;
    }

如果要手动对ByteBuffer扩容,可以在put之前,先校验剩余空间是否足够,如果不足够,创建一个新的ByteBuffer,新的容量确保足够,旧的buffer数据拷贝到新的buffer中,然后继续存储数据。

B ByteBuf的存储和扩容

当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以64作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加4M,这种方式叫做"步进式"。
验证扩容

public static void main(String[] args) {
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println("capacity:" + buf.capacity());
        for (int i = 0; i < 11; i++) {
            buf.writeByte(i);
        }
        System.out.println("capacity:" + buf.capacity());
        for (int i = 0; i < 65; i++) {
            buf.writeByte(i);
        }
        System.out.println("capacity:" + buf.capacity());
    }

输出结果 可以看到 扩容的范围变化

capacity:10
capacity:64
capacity:128
查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现。

ByteBuf buf = Unpooled.buffer(10);
System.out.println("capacity: " + buf.capacity());
for (int i = 0; i < 11; i++) {
    buf.writeByte(i);
}
----------------------------------------------------   

[ByteBuf类]
public abstract ByteBuf writeByte(int value);

按住Ctrl+Alt快捷键

[AbstractByteBuf子类]
---------------------------------------------------- 
    @Override
    public ByteBuf writeByte(int value) {
        // 确保可写空间足够
        ensureWritable0(1);
        // 写入数据
        _setByte(writerIndex++, value);
        return this;
    }
    
    // 参数为 最小写入数据的大小
    final void ensureWritable0(int minWritableBytes) {
        final int writerIndex = writerIndex();
        // 目标容量 = 当前写操作索引 + 最小写入数据大小
        final int targetCapacity = writerIndex + minWritableBytes;
        // 容量足够  不需扩容
        if (targetCapacity <= capacity()) {
            ensureAccessible();
            return;
        }
        // 容量不足时 如果目标容量 超出最大容量  抛出异常
        if (checkBounds && targetCapacity > maxCapacity) {
            ensureAccessible();
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }
		
		// 扩容逻辑
        // 获取可写空间大小
        final int fastWritable = maxFastWritableBytes();
        // 如果 可写空间 >= 所需空间   新的容量=写操作索引+可写空间大小 
        // 如果 可写空间 < 所需空间   计算要扩容的新容量大小 calculateNewCapacity方法
        int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
                : alloc().calculateNewCapacity(targetCapacity, maxCapacity);

        // Adjust to the new capacity.
        
        // 计算完成后 生成新的ByteBuffer
        capacity(newCapacity);
    }
    
    // 获取可写空间大小
    public int maxFastWritableBytes() {
        return writableBytes();
    }
    

[AbstractByteBufAllocator子类]
---------------------------------------------------- 
    // 计算要扩容的新容量大小
    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        // 校验参数有效性
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        // 扩容方式的分界点 以4M大小为界
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.、
        // 如果所需容量大于4M  按照步进的方式扩容 
        //   举例: 比如 minNewCapacity = 5M  
        if (minNewCapacity > threshold) {
            // newCapacity = 5 / 4 * 4 = 4M  确保是4的倍数
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                // newCapacity = 4 + 4 = 8M;
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        // 如果所需容量大于4M  按照64的倍数扩容  找到最接近所需容量的64的倍数
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }
        
        // 保障在最大可接受容量范围内
        return Math.min(newCapacity, maxCapacity);
    }

4)优势

A 池化的方式提高内存使用率

B 提出了复合型缓冲区的整合方案

C 增加了索引,使读写分离,使用更便捷

D 解决了ByteBuffer长度固定的问题,增加了扩容机制

E 用引用计数的方式进行对象回收

相关文章