Netty组件EventLoopGroup和EventLoop源码分析

x33g5p2x  于2021-12-21 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(425)

一、UnSafe相关介绍

1、JAVA中Unsafe简介

为什么先介绍Unsafe这个东西呢?我们知道JDK中也有UnSafe,Java中的Unsafe类为我们提供了类似C++手动管理内存的能力。封装这一系列的native方法。并且是禁止我们开发者自己使用的。当然你可以通过反射进行获取。

JAVA中的UnSafe提供以下的功能

可以看到,java中的unsafe提供的都是至关重要的一些功能。

2、Netty中Unsafe介绍

Netty中的unsafe同样也是非常重要的。因为在Netty源码中很多地方都是用到了这个相关工具,Unsafe接口中定义了socket相关操作,包括SocketAddress获取、selector注册、网卡端口绑定、socket建连与断连、socket写数据。这些操作都和jdk底层socket相关。他的继承关系如下。

Unsafe是Channel的内部类,一个Channel对应一个Unsafe。

Unsafe用于处理Channel对应网络IO的底层操作。ChannelHandler处理回调事件时产生的相关网络IO操作最终也会委托给Unsafe执行。

NioUnsafe在Unsafe基础上增加了几个操作,包括访问jdk的SelectableChannel、socket读数据等。

NioByteUnsafe实现了与socket连接的字节数据读取相关的操作。

NioMessageUnsafe实现了与新连接建立相关的操作。

二、EventLoopGroup和EventLoop源码分析

我们就从最开始的Demo开始了解Netty的源码吧。大家都知道Netty是一个网络Io框架,他继承NIO,BIO,AIO,并能够按照模板化的方式去实现相关功能。我们以前也单独讲过JAVA原生的NIO实现。那么Netty到底是怎么将他们产生联系的呢?接下来一步一步为大家解读Netty源码。了解Netty的技术内幕。

PS:以下源码使用的是Netty4.1.28版本

1、初始化EventLoopGroup

在服务器启动的常规代码里,首先是实例化NioEventLoopGroup和ServerBootstrap。

执行这行代码时会发生什么?由NioEventLoopGroup开始,一路调用,到达MultithreadEventLoopGroup,如果没有指定创建的线程数量,则默认创建的线程个数为DEFAULT_EVENT_LOOP_THREADS,该数值为:处理器数量x2。

  1. protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
  2. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
  3. }
  1. private static final int DEFAULT_EVENT_LOOP_THREADS;
  2. static {
  3. DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
  4. "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
  5. if (logger.isDebugEnabled()) {
  6. logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
  7. }
  8. }

最终由MultithreadEventExecutorGroup实例化

  1. /**
  2. * Create a new instance.
  3. *
  4. * @param nThreads the number of threads that will be used by this instance.
  5. * @param executor the Executor to use, or {@code null} if the default should be used.
  6. * @param chooserFactory the {@link EventExecutorChooserFactory} to use.
  7. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
  8. */
  9. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  10. EventExecutorChooserFactory chooserFactory, Object... args)

在这个构造方法中,实例化了每个EventLoop所需要的执行器Executor

  1. if (executor == null) {
  2. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  3. }

由此可见,每个NioEventLoop的执行器为ThreadPerTaskExecutor,ThreadPerTaskExecutor实现了Executor接口,并会在execute方法中启动真正的线程,但是要和NioEventLoop的线程挂钩则在SingleThreadEventExecutor的doStartThread方法里。

  1. public final class ThreadPerTaskExecutor implements Executor {
  2. private final ThreadFactory threadFactory;
  3. public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
  4. if (threadFactory == null) {
  5. throw new NullPointerException("threadFactory");
  6. }
  7. this.threadFactory = threadFactory;
  8. }
  9. @Override
  10. public void execute(Runnable command) {
  11. //使用真正的线程执行方法
  12. threadFactory.newThread(command).start();
  13. }
  14. }

接下来,new出EventExecutor(实际是NioEventLoop)的实例数组,并在循环里new每个具体的EventLoop实例

那么在NioEventLoop实例的构造方法里又做了什么事情呢?

  1. @Override
  2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  3. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
  4. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
  5. }

作为IO事件处理的主要组件,内部持有了Selector、SelectionKey的集合,所以构造方法中执行了关键方法openSelector(),最终通过JDK的api拿到selector的实例,作用和我们通过原生JDK的NIO编程中创建选择器是一样的。

另外,我们观察下NioEventLoop的类图如下

发现最终实现Exector,我们可以知道,EventLoop本质上是一个线程池,EventLoop内部维护着一个线程Thread和几个阻塞队列,所以EventLoop可以看成只有一个线程的线程池(SingleThreadPool)

每个EventLoop包含的线程Thread定义在父类SingleThreadEventExecutor中,每个EventLoop包含两个队列,taskQueue来自父类SingleThreadEventExecutor,保存各种任务,比如处理事件等等,tailTask来自父类SingleThreadEventLoop,用于每次事件循环后置任务处理

作为IO事件处理的主要组件,必然离不开对事件的处理机制,在NioEventLoop的run方法,就有selector上进行select和调用processSelectedKeys()处理各种事件集。

2、NioEventLoop 的运行

  1. @Override
  2. protected void run() {
  3. for (;;) {
  4. try {
  5. try {
  6. // 1、通过 hasTasks() 判断当前消息队列中是否还有未处理的消息
  7. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  8. case SelectStrategy.CONTINUE:
  9. continue;
  10. //hasTasks() 没有任务则执行 select() 处理网络IO
  11. case SelectStrategy.SELECT:
  12. //轮询事件,见第三小节
  13. select(wakenUp.getAndSet(false));
  14. if (wakenUp.get()) {
  15. selector.wakeup();
  16. }
  17. // fall through
  18. default:
  19. }
  20. } catch (IOException e) {
  21. // If we receive an IOException here its because the Selector is messed up. Let's rebuild
  22. // the selector and retry. https://github.com/netty/netty/issues/8566
  23. rebuildSelector0();
  24. handleLoopException(e);
  25. continue;
  26. }
  27. cancelledKeys = 0;
  28. needsToSelectAgain = false;
  29. // 处理IO事件所需的时间和花费在处理 task 时间的比例,默认为 50%
  30. final int ioRatio = this.ioRatio;
  31. if (ioRatio == 100) {
  32. try {
  33. // 如果 IO 的比例是100,表示每次都处理完IO事件后,才执行所有的task
  34. processSelectedKeys();
  35. } finally {
  36. // 执行 task 任务
  37. runAllTasks();
  38. }
  39. } else {
  40. // 记录处理 IO 开始的执行时间
  41. final long ioStartTime = System.nanoTime();
  42. try {
  43. //IO任务处理,见第四小节
  44. processSelectedKeys();
  45. } finally {
  46. // 计算处理 IO 所花费的时间
  47. final long ioTime = System.nanoTime() - ioStartTime;
  48. // 执行 task 任务,判断执行 task 任务时间是否超过配置的比例,如果超过则停止执行 task 任务
  49. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  50. }
  51. }
  52. } catch (Throwable t) {
  53. handleLoopException(t);
  54. }
  55. // Always handle shutdown even if the loop processing threw an exception.
  56. try {
  57. if (isShuttingDown()) {
  58. closeAll();
  59. if (confirmShutdown()) {
  60. return;
  61. }
  62. }
  63. } catch (Throwable t) {
  64. handleLoopException(t);
  65. }
  66. }
  67. }

1、调用selectStrategy.calculateStrategy 判断是否有 Task任务,如果没有则调用 selectSupplier.get() 方法,该方法是非阻塞的,判断是否有需要处理的 Channel。如果没有则返回 SelectStrategy.SELECT,然后执行 select(wakenUp.getAndSet(false)) 方法,阻塞等待可处理的 IO 就绪事件。

2、如果有 Task 任务,则判断 ioRatio 的比率值,该值为 EventLoop 处理 IO 和 处理 Task 任务的时间的比率。默认比率为 50%。

  • 如果 ioRatio == 100,则说明优先处理所有的 IO 任务,处理完所有的IO事件后才会处理所有的 Task 任务。
  • 如果 ioRatio <> 100, 则优先处理所有的IO任务,处理完所有的IO事件后,才会处理所有的Task 任务,但处理所有的Task 任务的时候会判断执行 Task 任务的时间比率,如果超过配置的比率则中断处理 Task 队列中的任务。

从中可以发现,什么情况下都会优先处理 IO任务,但处理非 IO 任务时,会判断非 IO 任务执行的时间不能超过 ioRatio 的阈值。

3、Select方法

  1. private void select(boolean oldWakenUp) throws IOException {
  2. Selector selector = this.selector;
  3. try {
  4. int selectCnt = 0;
  5. long currentTimeNanos = System.nanoTime();
  6. // 计算出 NioEventLoop 定时任务最近执行的时间(还有多少 ns 执行),单位 ns
  7. long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
  8. for (;;) {
  9. // 为定时任务中的时间加上0.5毫秒,将时间换算成毫秒
  10. long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
  11. // 对定时任务的超时时间判断,如果到时间或超时,则需要立即执行 selector.selectNow()
  12. if (timeoutMillis <= 0) {
  13. if (selectCnt == 0) {
  14. selector.selectNow();
  15. selectCnt = 1;
  16. }
  17. break;
  18. }
  19. // 轮询过程中发现有任务加入,中断本次轮询
  20. if (hasTasks() && wakenUp.compareAndSet(false, true)) {
  21. selector.selectNow();
  22. selectCnt = 1;
  23. break;
  24. }
  25. // Nio 的 阻塞式 select 操作
  26. int selectedKeys = selector.select(timeoutMillis);
  27. // select 次数 ++ , 通过该次数可以判断是否出发了 JDK Nio中的 Selector 空轮循 bug
  28. selectCnt ++;
  29. // 如果selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,则break
  30. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
  31. break;
  32. }
  33. //如果线程被中断则重置selectedKeys,同时break出本次循环,所以不会陷入一个繁忙的循环。
  34. if (Thread.interrupted()) {
  35. selectCnt = 1;
  36. break;
  37. }
  38. long time = System.nanoTime();
  39. // 如果超时,把 selectCnt 置为 1,开始下一次的循环
  40. if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
  41. // timeoutMillis elapsed without anything selected.
  42. selectCnt = 1;
  43. }
  44. // 如果 selectCnt++ 超过 默认的 512 次,说明触发了 Nio Selector 的空轮训 bug,则需要重新创建一个新的 Selector,并把注册的 Channel 迁移到新的 Selector 上
  45. else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
  46. selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
  47. // 重新创建一个新的 Selector,并把注册的 Channel 迁移到新的 Selector 上,
  48. //解决NIO Selector空轮询bug,见第五小节
  49. selector = selectRebuildSelector(selectCnt);
  50. selectCnt = 1;
  51. break;
  52. }
  53. currentTimeNanos = time;
  54. }
  55. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
  56. if (logger.isDebugEnabled()) {
  57. logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
  58. selectCnt - 1, selector);
  59. }
  60. }
  61. } catch (CancelledKeyException e) {
  62. if (logger.isDebugEnabled()) {
  63. logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
  64. selector, e);
  65. }
  66. }
  67. }

1、通过 delayNanos(currentTimeNanos) 计算出 定时任务队列中第一个任务的执行时间。
2、判断是否到期,如果到期则执行 selector.selectNow(),退出循环
3、如果定时任务未到执行时间,则通过 hasTasks() 判断是否有可执行的任务,如果有则中断本次循环。
4、既没有到期的定时任务、也没有可执行的Task,则调用 selector.select(timeoutMillis) 方法阻塞,等待注册到 Selector 上感兴趣的事件。
5、每次 select() 后都会 selectCnt++。通过该次数可以判断是否出发了 JDK Nio中的 Selector 空轮询 bug
6、如果selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,则break。
7、通过 selectCnt 判断是否触发了 JDK Selector 的空轮询 bug,SELECTOR_AUTO_REBUILD_THRESHOLD 默认为 512, 可修改。
8、通过 selectRebuildSelector() 方法解决 Selector 空轮询 bug。

4、processSelectedKeys IO事件处理

 

  1. private void processSelectedKeys() {
  2. if (selectedKeys != null) {
  3. processSelectedKeysOptimized();
  4. } else {
  5. //默认没有使用优化的 Set,所有调用 processSelectedKeysPlain() 方法进行处理 IO 任务
  6. processSelectedKeysPlain(selector.selectedKeys());
  7. }
  8. }
  1. private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
  2. // check if the set is empty and if so just return to not create garbage by
  3. // creating a new Iterator every time even if there is nothing to process.
  4. // See https://github.com/netty/netty/issues/597
  5. if (selectedKeys.isEmpty()) {
  6. return;
  7. }
  8. Iterator<SelectionKey> i = selectedKeys.iterator();
  9. //循环处理每个 selectionKey,每个selectionKey的处理首先根据attachment的类型来进行分发处理;
  10. for (;;) {
  11. final SelectionKey k = i.next();
  12. final Object a = k.attachment();
  13. i.remove();
  14. if (a instanceof AbstractNioChannel) {
  15. processSelectedKey(k, (AbstractNioChannel) a);
  16. } else {
  17. @SuppressWarnings("unchecked")
  18. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  19. processSelectedKey(k, task);
  20. }
  21. if (!i.hasNext()) {
  22. break;
  23. }
  24. if (needsToSelectAgain) {
  25. selectAgain();
  26. selectedKeys = selector.selectedKeys();
  27. // Create the iterator again to avoid ConcurrentModificationException
  28. if (selectedKeys.isEmpty()) {
  29. break;
  30. } else {
  31. i = selectedKeys.iterator();
  32. }
  33. }
  34. }
  35. }
  1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  2. //首先获取 Channel 的 NioUnsafe,所有的读写等操作都在 Channel 的 unsafe 类中操作。
  3. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  4. if (!k.isValid()) {
  5. final EventLoop eventLoop;
  6. try {
  7. eventLoop = ch.eventLoop();
  8. } catch (Throwable ignored) {
  9. return;
  10. }
  11. if (eventLoop != this || eventLoop == null) {
  12. return;
  13. }
  14. unsafe.close(unsafe.voidPromise());
  15. return;
  16. }
  17. try {
  18. int readyOps = k.readyOps();
  19. //熟悉的获取 SelectionKey 就绪事件,如果是 OP_CONNECT,则说明已经连接成功,并把注册的 OP_CONNECT 事件取消
  20. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  21. int ops = k.interestOps();
  22. ops &= ~SelectionKey.OP_CONNECT;
  23. k.interestOps(ops);
  24. unsafe.finishConnect();
  25. }
  26. //如果是 OP_WRITE 事件,说明可以继续向 Channel 中写入数据,当写完数据后用户自己吧 OP_WRITE 事件取消掉。
  27. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  28. ch.unsafe().forceFlush();
  29. }
  30. //如果是 OP_READ 或 OP_ACCEPT 事件,则调用 unsafe.read() 进行读取数据。unsafe.read() 中会调用到 ChannelPipeline 进行读取数据。
  31. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  32. unsafe.read();
  33. }
  34. } catch (CancelledKeyException ignored) {
  35. unsafe.close(unsafe.voidPromise());
  36. }
  37. }

5、Netty解决JAVA NiO空轮询BUG

第三节select方法介绍中,已经描述了Netty解决JAVA原生NIO空轮询bug的方法,主要思路就是重新创建 Selector,并把原 Selector 上注册的 Channel 迁移到新的 Selector 上

  1. private Selector selectRebuildSelector(int selectCnt) throws IOException {
  2. // 重新创建 Selector,并把原 Selector 上注册的 Channel 迁移到新的 Selector 上
  3. rebuildSelector();
  4. Selector selector = this.selector;
  5. selector.selectNow();
  6. return selector;
  7. }
  1. private void rebuildSelector0() {
  2. final Selector oldSelector = selector;
  3. final SelectorTuple newSelectorTuple;
  4. ......
  5. try {
  6. // 创建新的 Selector
  7. newSelectorTuple = openSelector();
  8. } catch (Exception e) {
  9. logger.warn("Failed to create a new Selector.", e);
  10. return;
  11. }
  12. int nChannels = 0;
  13. // 循环原 Selector 上注册的所有的 SelectionKey
  14. for (SelectionKey key: oldSelector.keys()) {
  15. Object a = key.attachment();
  16. try {
  17. int interestOps = key.interestOps();
  18. key.cancel();
  19. SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
  20. ......
  21. nChannels ++;
  22. } catch (Exception e) {
  23. ......
  24. }
  25. }
  26. // 将新的 Selector 替换 原 Selector
  27. selector = newSelectorTuple.selector;
  28. unwrappedSelector = newSelectorTuple.unwrappedSelector;
  29. ......
  30. }

6、EventLoop构造过程一图总结

相关文章