LongAdder源码阅读笔记

x33g5p2x  于2022-02-07 转载在 其他  
字(5.1k)|赞(0)|评价(0)|浏览(482)

一、功能描述

LongAdder通过创建多个副本对象,解决了多线程使用CAS更新同一个对象造成的CPU阻塞,加快了对线程处理的速度。当多个线程同一时刻更新一个AtomicLong类型的变量时,只有一个线程能够更新成功,其他线程则更新失败,继续尝试更新。

当使用LongAdder类型的变量时,由于副本数组的存在,线程不一定直接更新变量的本身而是更新副本数组,这样多线程请求的对象变多了,从而减少了更新时间,当需要使用变量值时,返回的值是基础变量的值加上数组内每一个副本的值的和。

二、源码解读

LongAdder继承自Striped64并实现了Serializable接口,而在Striped64类中有一个Cell类

2.1 add方法分析

首先从LongAdder类的add方法入手

  1. public void add(long x) {
  2. Cell[] as; long b, v; int m; Cell a;
  3. if ((as = cells) != null || !casBase(b = base, b + x)) {
  4. boolean uncontended = true;
  5. if (as == null || (m = as.length - 1) < 0 ||
  6. (a = as[getProbe() & m]) == null ||
  7. !(uncontended = a.cas(v = a.value, v + x)))
  8. longAccumulate(x, null, uncontended);
  9. }
  10. }

从上面的代码可以看到LongAdder的实现主要依靠的是cells数组,如果cells数组为空的话,则尝试使用cas更新基础变量base,如果成功了,则add成功,方法结束,如果cas更新base失败了,则证明此时有其他线程参与base变量的更新,此后的处理与cells不为空一致(如果cells不为空,则在此次方法执行前就已经有多线程参与了更新)。
当cells数组不为空或者更新base变量失败后,则转而更新cells数组中的副本,此时先判断cells数组是否为空或长度为0,如果为空或长度为0则说明这是第一次操作cells数组,应先初始化cells数组,因此调用方法longAccumulate(x, null, true);
如果cells数组不为空,则尝试直接访问数组中的副本,getProbe方法代码:

  1. static final int getProbe() {
  2. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  3. }

从上面代码可以看到getProbe方法获取了当前线程内的PROBE变量,而PROBE定义在Striped64类中

  1. private static final long PROBE;
  2. static {
  3. try {
  4. UNSAFE = sun.misc.Unsafe.getUnsafe();
  5. Class<?> sk = Striped64.class;
  6. BASE = UNSAFE.objectFieldOffset
  7. (sk.getDeclaredField("base"));
  8. CELLSBUSY = UNSAFE.objectFieldOffset
  9. (sk.getDeclaredField("cellsBusy"));
  10. Class<?> tk = Thread.class;
  11. PROBE = UNSAFE.objectFieldOffset
  12. (tk.getDeclaredField("threadLocalRandomProbe"));
  13. } catch (Exception e) {
  14. throw new Error(e);
  15. }
  16. }

从上面的代码可以看出PROBE来自于Thread类的threadLocalRandomProbe变量,是一个线程级变量。getProbe() & m则是获取当前要更新的cell,如果cell为空的话则调用longAccumulate(x, null, true);方法设置cell的值,如果cell不为空的话则使用cas直接更新cell的值,并将更新结果保存在uncontended中,如果uncontended的值为false(即cas更新失败了,此时应该有多个线程同时访问了一个cell),那么继续调用longAccumulate(x, null, false);方法。

2.2 longAccumulate方法分析

从上面的add方法可以看到,getProbe()取得了Thread类中的threadLocalRandomProbe变量,而threadLocalRandomProbe变量的初始值为0,因为getProbe()方法参与了多线程访问哪一个cell的定位,因此getProbe()的值不可能为0,那么threadLocalRandomProbe变量是在哪里赋值的呢?
在add方法中观察到,没当方法进行不下去时(base变量更新失败,cells为空,cell更新失败),都会调用longAccumulate方法,因此longAccumulate一定是涉及了cells数组的初始化和扩容,观察longAccumulate方法代码:

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. int h;
  4. if ((h = getProbe()) == 0) {
  5. ThreadLocalRandom.current(); // force initialization
  6. h = getProbe();
  7. wasUncontended = true;
  8. }
  9. boolean collide = false; // True if last slot nonempty
  10. for (;;) {
  11. Cell[] as; Cell a; int n; long v;
  12. if ((as = cells) != null && (n = as.length) > 0) {
  13. if ((a = as[(n - 1) & h]) == null) {
  14. if (cellsBusy == 0) { // Try to attach new Cell
  15. Cell r = new Cell(x); // Optimistically create
  16. if (cellsBusy == 0 && casCellsBusy()) {
  17. boolean created = false;
  18. try { // Recheck under lock
  19. Cell[] rs; int m, j;
  20. if ((rs = cells) != null &&
  21. (m = rs.length) > 0 &&
  22. rs[j = (m - 1) & h] == null) {
  23. rs[j] = r;
  24. created = true;
  25. }
  26. } finally {
  27. cellsBusy = 0;
  28. }
  29. if (created)
  30. break;
  31. continue; // Slot is now non-empty
  32. }
  33. }
  34. collide = false;
  35. }
  36. else if (!wasUncontended) // CAS already known to fail
  37. wasUncontended = true; // Continue after rehash
  38. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  39. fn.applyAsLong(v, x))))
  40. break;
  41. else if (n >= NCPU || cells != as)
  42. collide = false; // At max size or stale
  43. else if (!collide)
  44. collide = true;
  45. // 此处进行数组的扩容
  46. else if (cellsBusy == 0 && casCellsBusy()) {
  47. try {
  48. if (cells == as) { // Expand table unless stale
  49. Cell[] rs = new Cell[n << 1];
  50. for (int i = 0; i < n; ++i)
  51. rs[i] = as[i];
  52. cells = rs;
  53. }
  54. } finally {
  55. cellsBusy = 0;
  56. }
  57. collide = false;
  58. continue; // Retry with expanded table
  59. }
  60. h = advanceProbe(h);
  61. }
  62. // 此处进行数组的初始化
  63. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  64. boolean init = false;
  65. try { // Initialize table
  66. if (cells == as) {
  67. Cell[] rs = new Cell[2];
  68. rs[h & 1] = new Cell(x);
  69. cells = rs;
  70. init = true;
  71. }
  72. } finally {
  73. cellsBusy = 0;
  74. }
  75. if (init)
  76. break;
  77. }
  78. else if (casBase(v = base, ((fn == null) ? v + x :
  79. fn.applyAsLong(v, x))))
  80. break; // Fall back on using base
  81. }
  82. }

longAccumulate代码的设计非常复杂,刚进入方法的代码进行了threadLocalRandomProbe变量的初始化

  1. int h;
  2. if ((h = getProbe()) == 0) {
  3. ThreadLocalRandom.current(); // force initialization
  4. h = getProbe();
  5. wasUncontended = true;
  6. }

如果getProbe()取得的值为0,说明threadLocalRandomProbe变量并未被初始化过,此时调用ThreadLocalRandom.current();方法进行初始化,并且将参数wasUncontended设置为true。ThreadLocalRandom.current()方法代码:

  1. public static ThreadLocalRandom current() {
  2. if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
  3. localInit();
  4. return instance;
  5. }
  6. static final void localInit() {
  7. int p = probeGenerator.addAndGet(PROBE_INCREMENT);
  8. int probe = (p == 0) ? 1 : p; // skip 0
  9. long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
  10. Thread t = Thread.currentThread();
  11. UNSAFE.putLong(t, SEED, seed);
  12. UNSAFE.putInt(t, PROBE, probe);
  13. }

此时发现,threadLocalRandomProbe变量的初始化实在ThreadLocalRandom类中进行的,使用ThreadLocalRandom类的好处是为每一个线程维护一个随机数种子,不涉及多线程竞争种子的问题。而在longAccumulate方法中初始化threadLocalRandomProbe变量是一种延迟初始化的操作,如果cells为空,即使threadLocalRandomProbe变量有值也是没有意义的。

从上面add方法调用longAccumulate处可以发现,cells数组为null或当前线程要更新的cell为null时wasUncontended的值为true,如果更新cell失败,则cell的值为false,那么wasUncontended的值一定是cells数组进行扩容的依据。

因为是多个线程同时操作cells数组,那么对数组的初始化和扩容一定只能由一个线程来完成,因此定义了cellsBusy变量,当检测到cellsBusy的值为0并使用casCellsBusy()方法成功将其设置为1后才可以进行初始化和扩容操作。数组的初始化将cells长度设置为2,并且初始化将要访问的cell,另一个cell则保持默认值null,完成后将cellsBusy的值重新设置为0,方便其他线程之后进行扩容(此处设置并不是cas操作,因为当前初始化代码只有一个线程能执行到)。

而扩容要检查当前cells数组的长度小于cpu的个数时才可以进行(当数组当都等于cpu个数时效率才最高),扩容操作完成后调用advanceProbe()方法从新计算threadLocalRandomProbe变量的值,以减少访问celll冲突的个数。

另外,在定义Cell类时使用了@sun.misc.Contended注解,这样保证了一个Cell类对象占满一个缓存行,从而避免了伪共享问题,提升了性能。

相关文章