【java】LongAdder源码分析原理分析

x33g5p2x  于2022-02-18 转载在 Java  
字(5.1k)|赞(0)|评价(0)|浏览(651)

1.概述

转载:jdk11源码–LongAdder源码分析原理分析

先参考使用相关的文档:

高并发中计数器的实现方式有哪些?

【java】阿里为什么推荐使用LongAdder,而不是volatile?

针对JDK中的原子类,想必大家都熟悉AtomicInteger,AtomicLong等类。他们都是采用CAS乐观锁方式来实现的。

但是这种方式是否还有继续优化的空间呢?答案是肯定的。

CAS乐观锁对临界区的数据(也就是atomicLong中的volatile long value属性)进行修改,这个属性是热点数据。并发量高的时候,会出现很多线程都轮询修改value属性的情况,CPU消耗比较高

大家在想一下,在秒杀,拍卖,银行转账等业务场景下,可能存在以下情况:大量客户的请求都需要修改某个银行账户的余额。有一种优化策略就是将该银行热点账户拆分为多条记录,将请求hash路由到不同的子账户中进行计算。那么上述Atomic类也可以采用该种策略:热点数据拆分。

这就是阅读源码的作用,可以学习到各种各样的优秀设计,并且可以将其应用到具体的工作之中。

2.LongAdder案例

编写java代码测试LongAdder:

  1. @Test
  2. public void test() throws InterruptedException {
  3. LongAdder longAdder = new LongAdder();
  4. AtomicLong aLong = new AtomicLong();
  5. ExecutorService threadPool = Executors.newFixedThreadPool(100);
  6. for(int i = 0; i < 1000000;i++){
  7. threadPool.execute(() -> {
  8. longAdder.increment();
  9. aLong.incrementAndGet();
  10. });
  11. }
  12. TimeUnit.SECONDS.sleep(2);//等待线城池执行完成
  13. System.out.println(longAdder.longValue());
  14. System.out.println(aLong.get());
  15. }

注意上面线程数设置的多一点,才能造成比较激烈的竞争。

在longValue()方法中打断点看一下实际的结果分布,可以看到base中有值,并且有4个cells,每个cell都有值。将4个cell的值累加再加上base的值正好是100W,详细源码分析请继续阅读:

3.LongAdder源码分析

首先画一个AtomicLong和LongAdder的临界区数据对比图:

从上面的图中大家可以看到临界区数据的分布 ,至此可以完全理解LongAdder的优化思路了。

接下来看源码,主要关注红框内的方法

LongAdder继承自Striped64类。Striped64类中核心参数:

  1. //CPU的数量,用于cell数组容量扩容
  2. static final int NCPU = Runtime.getRuntime().availableProcessors();
  3. /**
  4. * 容量大小是2的幂次方。当竞争大的时候,则会修改cell中的数值
  5. */
  6. transient volatile Cell[] cells;
  7. /**
  8. * base基础数据,CAS竞争不大的话,则直接修改base
  9. */
  10. transient volatile long base;
  11. /**
  12. * 是否竞争激烈
  13. */
  14. transient volatile int cellsBusy;

可以看到都是volatile修饰的,都解决了JMM中的可见性的问题。

首先看一下LongAdder核心的Cell类:

  1. /**
  2. * Padded variant of AtomicLong supporting only raw accesses plus CAS.
  3. *
  4. * JVM intrinsics note: It would be possible to use a release-only
  5. * form of CAS here, if it were provided.
  6. */
  7. @jdk.internal.vm.annotation.Contended static final class Cell {
  8. volatile long value;//Cell中存储数据的属性,volatile 修饰保证可见性
  9. Cell(long x) { value = x; }
  10. final boolean cas(long cmp, long val) {
  11. return VALUE.compareAndSet(this, cmp, val); //CAS修改
  12. }
  13. final void reset() {
  14. VALUE.setVolatile(this, 0L);
  15. }
  16. final void reset(long identity) {
  17. VALUE.setVolatile(this, identity);
  18. }
  19. final long getAndSet(long val) {
  20. return (long)VALUE.getAndSet(this, val);
  21. }
  22. // VarHandle mechanics
  23. private static final VarHandle VALUE;
  24. static {
  25. try {
  26. MethodHandles.Lookup l = MethodHandles.lookup();
  27. VALUE = l.findVarHandle(Cell.class, "value", long.class);
  28. } catch (ReflectiveOperationException e) {
  29. throw new ExceptionInInitializerError(e);
  30. }
  31. }
  32. }

@jdk.internal.vm.annotation.Contended这个注解的意思是JVm内部进行了优化,解决了伪共享问题

再看一下获取数据的方法:

  1. public long longValue() {
  2. return sum();
  3. }
  4. public long sum() {
  5. Cell[] cs = cells;//cell中的数据
  6. long sum = base;//基本数据
  7. if (cs != null) {
  8. for (Cell c : cs)
  9. if (c != null)
  10. sum += c.value;
  11. }
  12. return sum;
  13. }

可以看到很简单,将base和cells数组中的数据累加起来即可。

继续,看如何CAS递增数据,这里比较复杂一些。

  1. /**
  2. * Adds the given value.
  3. *
  4. * @param x the value to add
  5. */
  6. public void add(long x) {
  7. Cell[] cs; long b, v; int m; Cell c;
  8. if ((cs = cells) != null || !casBase(b = base, b + x)) {
  9. boolean uncontended = true;
  10. if (cs == null || (m = cs.length - 1) < 0 ||
  11. (c = cs[getProbe() & m]) == null ||
  12. !(uncontended = c.cas(v = c.value, v + x)))
  13. longAccumulate(x, null, uncontended);
  14. }
  15. }
  16. /**
  17. * Equivalent to {@code add(1)}.
  18. */
  19. public void increment() {
  20. add(1L);
  21. }

初始化时,cells肯定为空。我们先看一下casBase(b = base, b + x)的实现

  1. /**
  2. * CASes the base field.
  3. */
  4. final boolean casBase(long cmp, long val) {
  5. return BASE.compareAndSet(this, cmp, val);
  6. }

当CAS修改base的值失败时,则说明并发比较高,则进入到if内部代码,首先设置uncontended =true,表明竞争激烈。

接下来需要判断4个条件:

  1. cs == null
  2. (m = cs.length - 1) < 0
  3. (c = cs[getProbe() & m]) == null
  4. !(uncontended = c.cas(v = c.value, v + x))

前两个条件判断cells数组是否为空,如果是空,则走longAccumulate方法。

第三个条件是根据当前线程与数组进行逻辑与操作,获得的cell位置如果为空则走longAccumulate方法。

第三步的结果c是线程路由的cells数组的位置。

第四个条件是对这个cell中的value进行CAS修改,修改失败则走longAccumulate方法。

通过这几步可以知道,LongAdder是在遇到并发激烈时,将线程路由到cells数组中的某个位置对该位置的Cell的value进行cas修改。而longAccumulate则会对cells数组进行扩容等维护工作。看看longAccumulate源码:

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. int h;
  4. if ((h = getProbe()) == 0) {
  5. ThreadLocalRandom.current(); // 强制初始化
  6. h = getProbe();//返回当前线程的threadLocalRandomProbe值
  7. wasUncontended = true;
  8. }
  9. boolean collide = false; // True if last slot nonempty
  10. done: for (;;) {
  11. Cell[] cs; Cell c; int n; long v;
  12. if ((cs = cells) != null && (n = cs.length) > 0) {
  13. if ((c = cs[(n - 1) & h]) == null) {
  14. if (cellsBusy == 0) { // 尝试添加新的Cell
  15. Cell r = new Cell(x); // Optimistically create乐观创建cell
  16. if (cellsBusy == 0 && casCellsBusy()) {
  17. try { // Recheck under lock
  18. Cell[] rs; int m, j;
  19. if ((rs = cells) != null &&
  20. (m = rs.length) > 0 &&
  21. rs[j = (m - 1) & h] == null) {
  22. rs[j] = r;
  23. break done;
  24. }
  25. } finally {
  26. cellsBusy = 0;
  27. }
  28. continue; // Slot is now non-empty
  29. }
  30. }
  31. collide = false;
  32. }
  33. else if (!wasUncontended) // CAS already known to fail
  34. wasUncontended = true; // Continue after rehash
  35. else if (c.cas(v = c.value,
  36. (fn == null) ? v + x : fn.applyAsLong(v, x)))
  37. //上面这里是对cell中的value进行cas修改
  38. break;
  39. else if (n >= NCPU || cells != cs)
  40. collide = false; // 判断数组大小是否大于核数【cells数组最大不超过CPU可用核数】
  41. else if (!collide)
  42. collide = true;
  43. else if (cellsBusy == 0 && casCellsBusy()) {
  44. try {//对cells数组进行扩容,直接扩容为2倍,下面是采用位移操作 : n << 1
  45. if (cells == cs) // Expand table unless stale
  46. cells = Arrays.copyOf(cs, n << 1);
  47. } finally {
  48. cellsBusy = 0;
  49. }
  50. collide = false;
  51. continue; // Retry with expanded table
  52. }
  53. h = advanceProbe(h);
  54. }
  55. else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
  56. try { // Initialize table
  57. if (cells == cs) {
  58. Cell[] rs = new Cell[2];//初始化cells数组大小是2
  59. rs[h & 1] = new Cell(x);
  60. cells = rs;
  61. break done;
  62. }
  63. } finally {
  64. cellsBusy = 0;
  65. }
  66. }
  67. // cas修改base变量。
  68. else if (casBase(v = base,
  69. (fn == null) ? v + x : fn.applyAsLong(v, x)))
  70. break done;
  71. }
  72. }

该方法大概思路就是无限循环对cells数组进行操作更新。如果对应的cell为空则cas创建cell并插入,如果不为空则cas修改其value值。如果cas修改失败则扩容,但是扩容最大值是CPU核数。

相关文章

最新文章

更多