Netty源码分析-Channel对Buffer的堆内存和直接内存的使用

x33g5p2x  于2021-12-21 转载在 其他  
字(5.8k)|赞(0)|评价(0)|浏览(533)

Channel的Unsafe对Buffer的使用

  • 在Channel的接口体系设计中,Channel只是与ChannelHandler,ChannelHandlerContext,ChannelPipeline完成对数据的处理,而不直接参与和底层socket进行数据交互,是通过一个内部Unsafe的接口设计来完成这项工作。
  • 如对于数据读取,在Channel的实现中拿到的是Unsafe已经读取回来的数据,Unsafe将socket的字节数据读取后转为了对象Object msg的数据,或者将ServerSocket接收到的客户端socket连接,将socket对象交给Channel。具体为交给Channel所绑定的ChannelPipeline,从而开始Channel体系结构的数据流处理。
  • 故Buffer包中相关缓存实现类主要是在Channel的Unsafe接口的实现类中引用,读取底层Java NIO的socket的数据,如SocketChannel的bytes:
    如下为AbstractNioByteChannel的NioByteUnsafe的read方法:
    AbstractNioByteChannel是NioSocketChannel的基类,主要是读取客户端发送过来的字节数据
  1. @Override
  2. public final void read() {
  3. final ChannelConfig config = config();
  4. if (shouldBreakReadReady(config)) {
  5. clearReadPending();
  6. return;
  7. }
  8. final ChannelPipeline pipeline = pipeline();
  9. // 获取allocator,allocator是创建ByteBuf对象
  10. final ByteBufAllocator allocator = config.getAllocator();
  11. final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  12. allocHandle.reset(config);
  13. ByteBuf byteBuf = null;
  14. boolean close = false;
  15. try {
  16. do {
  17. // 从allocator获取一个ByteBuf对象实例
  18. byteBuf = allocHandle.allocate(allocator);
  19. // doReadBytes从底层socket读取数据到byteBuf
  20. allocHandle.lastBytesRead(doReadBytes(byteBuf));
  21. if (allocHandle.lastBytesRead() <= 0) {
  22. // nothing was read. release the buffer.
  23. byteBuf.release();
  24. byteBuf = null;
  25. close = allocHandle.lastBytesRead() < 0;
  26. if (close) {
  27. // There is nothing left to read as we received an EOF.
  28. readPending = false;
  29. }
  30. break;
  31. }
  32. allocHandle.incMessagesRead(1);
  33. readPending = false;
  34. // 将byteBuf传给pipeline,从而开始数据处理
  35. pipeline.fireChannelRead(byteBuf);
  36. byteBuf = null;
  37. } while (allocHandle.continueReading());
  38. allocHandle.readComplete();
  39. pipeline.fireChannelReadComplete();
  40. if (close) {
  41. closeOnRead(pipeline);
  42. }
  43. } catch (Throwable t) {
  44. handleReadException(pipeline, byteBuf, t, close, allocHandle);
  45. } finally {
  46. // Check if there is a readPending which was not processed yet.
  47. // This could be for two reasons:
  48. // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
  49. // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
  50. //
  51. // See https://github.com/netty/netty/issues/2254
  52. if (!readPending && !config.isAutoRead()) {
  53. removeReadOp();
  54. }
  55. }
  56. }
  • 从以上代码可以看出是ByteBuf的allocator是从ChannelConfig中获取:
  1. final ByteBufAllocator allocator = config.getAllocator();

Channel使用堆内存还是直接内存

  1. allocator的实际类型代表了Channel具体是使用堆内存还是直接内存。这个可以通过用户代码通过childOption设置或者使用默认的:

  2. 用户代码传入:主要是通过childOption传入,则ServerBootstrap在接收到客户端连接并创建SocketChannel时,会根据初始化ServerBootstrap时的childOption设置,对该SocketChannel的config进行配置。

  1. // 处理客户端请求的配置
  2. serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
  3. serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
  4. // 客户端SocketChannel所使用的allocator的实现类
  5. serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
  6. serverBootstrap.childHandler(new NettyServerInitializer(webSocketService));
  1. 默认实现,一般不需要通过用户代码传入,使用默认的即可,默认实现为:如果当前的运行平台是android,则使用unpooled,即不使用池化机制,因为android移动端,内存资源有限,不适合做缓存;其他平台则默认使用池化机制,提高性能,以空间换时间。其次是使用堆内存还是直接内存的问题,主要是根据:

(1)当前运行平台是否支持使用Java的unsafe来进行本地方法调用;

(2)程序的系统参数是否设置了io.netty.noPreferDirect=true。如果当前平台支持unsafe且io.netty.noPreferDirect=false或者没有设置,默认为false,则使用直接内存;否则使用堆内存。
这两个参数的默认值:

  1. 1. 是否运行通过底层api直接访问直接内存,默认:允许
  2. -Dio.netty.noPreferDirect
  3. 2. 是否允许使用sun.misc.Unsafe,默认:允许;注意:使用sun的私有类库存在平台移植问题,另外sun.misc.Unsafe类是不安全的,如果操作失败,不是抛出异常,而是虚拟机core dump,不建议使用Unsafe
  4. -Dio.netty.noUnsafe

源码分析如下:

  1. 1.
  2. public class DefaultChannelConfig implements ChannelConfig {
  3. ...
  4. private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
  5. ...
  6. }
  7. 2.
  8. public interface ByteBufAllocator {
  9. ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
  10. }
  11. 3. DEFAULT_ALLOCATOR
  12. public final class ByteBufUtil {
  13. ...
  14. static final ByteBufAllocator DEFAULT_ALLOCATOR;
  15. static {
  16. // android则是unpooled,其他为pooled
  17. String allocType = SystemPropertyUtil.get(
  18. "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
  19. allocType = allocType.toLowerCase(Locale.US).trim();
  20. ByteBufAllocator alloc;
  21. if ("unpooled".equals(allocType)) {
  22. alloc = UnpooledByteBufAllocator.DEFAULT;
  23. logger.debug("-Dio.netty.allocator.type: {}", allocType);
  24. } else if ("pooled".equals(allocType)) {
  25. alloc = PooledByteBufAllocator.DEFAULT;
  26. logger.debug("-Dio.netty.allocator.type: {}", allocType);
  27. } else {
  28. alloc = PooledByteBufAllocator.DEFAULT;
  29. logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
  30. }
  31. DEFAULT_ALLOCATOR = alloc;
  32. ...
  33. }
  34. ...
  35. }
  36. 4. UnpooledByteBufAllocator.DEFAULT:非池化机制默认alloctor
  37. /** * Default instance which uses leak-detection for direct buffers. */
  38. public static final UnpooledByteBufAllocator DEFAULT =
  39. new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
  40. 5. PooledByteBufAllocator.DEFAULT:池化机制默认allocator
  41. public static final PooledByteBufAllocator DEFAULT =
  42. new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
  43. 6. 45中,都调用了PlatformDependent.directBufferPreferred(),如果返回true,则使用直接内存,否则使用堆内存。PlatformDependent.directBufferPreferred()的底层实现如下:
  44. private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause0();
  45. private static final boolean DIRECT_BUFFER_PREFERRED =
  46. UNSAFE_UNAVAILABILITY_CAUSE == null && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
  47. unsafeUnavailabilityCause0的实现:判断当前平台是否支持使用Javaunsafe
  48. private static Throwable unsafeUnavailabilityCause0() {
  49. if (isAndroid()) {
  50. logger.debug("sun.misc.Unsafe: unavailable (Android)");
  51. return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (Android)");
  52. }
  53. Throwable cause = PlatformDependent0.getUnsafeUnavailabilityCause();
  54. if (cause != null) {
  55. return cause;
  56. }
  57. try {
  58. boolean hasUnsafe = PlatformDependent0.hasUnsafe();
  59. logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
  60. return hasUnsafe ? null : PlatformDependent0.getUnsafeUnavailabilityCause();
  61. } catch (Throwable t) {
  62. logger.trace("Could not determine if Unsafe is available", t);
  63. // Probably failed to initialize PlatformDependent0.
  64. return new UnsupportedOperationException("Could not determine if Unsafe is available", t);
  65. }
  66. }

相关文章