【java】ConcurrentHashMap遍历 --- 弱一致性的迭代器(Iterator)实现原理

x33g5p2x  于2022-03-21 转载在 Java  
字(8.4k)|赞(0)|评价(0)|浏览(474)

1.概述

转载:ConcurrentHashMap遍历 — 弱一致性的迭代器(Iterator)实现原理

在 Java并发编程实战(进阶篇) 中分析了 Vector 在迭代过程中对容器进行修改会抛出 ConcurrentModificationException 异常,但在并发容器中还会出现这种情况吗?

在并发容器中并不会出现这种情况,这是因为,util包中的迭代器实现是fast-failed迭代器(就是一旦由修改就抛异常)而在current包中迭代器是弱一致性迭代器。

那么我们通过 ConcurrentHashMap 源码分析一下这种弱一致性迭代器的工作原理:

测试代码在主线程向容器中预先添加一些元素,然后另外启动一个线程不断向容器中添加元素,同时在主线程使用迭代器遍历容器。此代码运行的结果是,不会抛出 ConcurrentModificationException ,同时添加到hashMap中的部分元素也会被迭代器迭代输出。

1.1 案例1

测试代码:

  1. /**
  2. * 测试点:测试使用 ConcurrentHashMap 的时候,一边添加数据 一边遍历数据
  3. * 添加的数据 能被遍历到吗
  4. *
  5. * 测试结果
  6. *
  7. * 不会抛出 ConcurrentModificationException ,同时添加到hashMap中的部分元素也会被迭代器迭代输出。
  8. * 但是是乱序的
  9. *
  10. *
  11. */
  12. public void concurrentAddAndItera() {
  13. ConcurrentHashMap<String, String> hashMap =
  14. new ConcurrentHashMap<>();
  15. for (int i = 0; i < 100; i++) {
  16. hashMap.put("" + i, "" + i);
  17. }
  18. Thread thread = new Thread(() -> {
  19. for (int i = 200; i < 10000000; i++) {
  20. hashMap.put("" + i, "" + i);
  21. try {
  22. Thread.sleep(50);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. });
  28. thread.start();
  29. Iterator<String> iterator = hashMap.keySet().iterator();
  30. while (iterator.hasNext()) {
  31. String key = (String) iterator.next();
  32. System.out.println(key);
  33. try {
  34. Thread.sleep(50);
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }

1.2 案例2

可以看到确实不是全部遍历出来了。

  1. /**
  2. * 测试点:测试使用 ConcurrentHashMap 的时候,一边添加数据 一边遍历数据
  3. * 添加的数据 能被遍历到吗
  4. *
  5. * 测试能否遍历出来,所有的新加的数据
  6. *
  7. * 测试结果
  8. *
  9. * 线程退出
  10. * 没有了
  11. * 最终剩余:7740
  12. * 最终剩余:2261
  13. *
  14. * 可以看到有很多数据是没有遍历出来的
  15. */
  16. public void concurrentAddAndItera1() throws Exception,InterruptedException {
  17. // 用于检测的hashMap
  18. ConcurrentHashMap<String, String> checkHashMap = new ConcurrentHashMap<>();
  19. for (int i = 0; i <= 10000; i++) { checkHashMap.put("" + i, "" + i); }
  20. ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>();
  21. for (int i = 0; i < 100; i++) { hashMap.put("" + i, "" + i); }
  22. Thread thread = new Thread(() -> {
  23. for (int i = 101; i <= 9000; i++) {
  24. hashMap.put("" + i, "" + i);
  25. sleep(10);
  26. }
  27. System.out.println("线程退出");
  28. });
  29. thread.start();
  30. int count = 0;
  31. Iterator<String> iterator = hashMap.keySet().iterator();
  32. while (iterator.hasNext()) {
  33. String key = (String) iterator.next();
  34. // System.out.println(key);
  35. checkHashMap.remove(key);
  36. sleep(50);
  37. count++;
  38. if(!iterator.hasNext()) System.out.println("没有了");
  39. }
  40. System.out.println("最终剩余:"+checkHashMap.size());
  41. System.out.println("最终剩余:"+count);
  42. }
  43. private void sleep(int i) {
  44. try {
  45. Thread.sleep(i);
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. }

2.源码分析

出现这种情况也是我们希望并发容器出现的结果,那么查看源码分析:

2.1 keySet

调用 hashMap.keySet() ,返回了一个KeySetView内部类的实例

  1. public KeySetView<K,V> keySet() {
  2. KeySetView<K,V> ks;
  3. return (ks = keySet) != null ? ks : (keySet = new KeySetView<K,V>(this, null));
  4. }

keySet 是 ConcurrentHashMap 类中的私有变量,存储容器中的键的 Set 集合

2.2 KeySetView

KeySetView 静态内部类,重点分析 iterator 方法,返回 KeyIterator 迭代器实例

其实简单理解,KeySetView 相当于存放 ConcurrentHashMap 键的一个 Set 集合

  1. public static class KeySetView<K,V> extends CollectionView<K,V,K>
  2. implements Set<K>, java.io.Serializable {
  3. private static final long serialVersionUID = 7249069246763182397L;
  4. private final V value;
  5. KeySetView(ConcurrentHashMap<K,V> map, V value) { // non-public
  6. super(map);
  7. this.value = value;
  8. }
  9. public Iterator<K> iterator() {
  10. Node<K,V>[] t;
  11. ConcurrentHashMap<K,V> m = map;
  12. int f = (t = m.table) == null ? 0 : t.length;
  13. return new KeyIterator<K,V>(t, f, 0, f, m);
  14. }
  15. public Spliterator<K> spliterator() {
  16. Node<K,V>[] t;
  17. ConcurrentHashMap<K,V> m = map;
  18. long n = m.sumCount();
  19. int f = (t = m.table) == null ? 0 : t.length;
  20. return new KeySpliterator<K,V>(t, f, 0, f, n < 0L ? 0L : n);
  21. }
  22. }

map 是 CollectionView 类中成员变量,在调用 hashMap.keySet() 方法时,会将当前 ConcurrentHashMap 传入构造方法参数中(代码见上)。

2.3 KeyIterator

KeyIterator 类继承 BaseIterator 类,而 BaseIterator 类继承 Traverser 类

下面贴出 KeyIterator 类、BaseIterator 类、 Traverser 类代码,可以看出重要实现都在 Traverser 类中,后面主要关注 Traverser 类。

KeyIterator 类:

  1. static final class KeyIterator<K,V> extends BaseIterator<K,V>
  2. implements Iterator<K>, Enumeration<K> {
  3. KeyIterator(Node<K,V>[] tab, int index, int size, int limit,
  4. ConcurrentHashMap<K,V> map) {
  5. super(tab, index, size, limit, map);
  6. }
  7. public final K next() {
  8. Node<K,V> p;
  9. if ((p = next) == null)
  10. throw new NoSuchElementException();
  11. K k = p.key;
  12. lastReturned = p;
  13. advance();
  14. return k;
  15. }
  16. public final K nextElement() { return next(); }
  17. }

KeyIterator 构造方法中调用 super(tab, index, size, limit, map) 父类 BaseIterator 构造方法。

2.4 BaseIterator

  1. static class BaseIterator<K,V> extends Traverser<K,V> {
  2. final ConcurrentHashMap<K,V> map;
  3. Node<K,V> lastReturned;
  4. BaseIterator(Node<K,V>[] tab, int size, int index, int limit,
  5. ConcurrentHashMap<K,V> map) {
  6. super(tab, size, index, limit);
  7. this.map = map;
  8. advance();
  9. }
  10. public final boolean hasNext() { return next != null; }
  11. public final boolean hasMoreElements() { return next != null; }
  12. public final void remove() {
  13. Node<K,V> p;
  14. if ((p = lastReturned) == null)
  15. throw new IllegalStateException();
  16. lastReturned = null;
  17. map.replaceNode(p.key, null, null);
  18. }
  19. }

BaseIterator 构造方法还是调用父类 Traverser 的构造方法,并且调用父类 advance 方法,先获取下一个遍历的有效结点。

2.5 Traverser

  1. static class Traverser<K,V> {
  2. Node<K,V>[] tab; // current table; updated if resized
  3. // 下一个要访问的entry
  4. Node<K,V> next; // the next entry to use
  5. // 发现forwardingNode时,stack保存当前栈顶 table 相关信息
  6. // spare 按照 stack 的出栈顺序保存table,栈顶保存最后一个出栈的table
  7. TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
  8. // 下一个要访问的hash桶索引
  9. int index; // index of bin to use next
  10. // 当前正在访问的初始table的hash桶索引
  11. int baseIndex; // current index of initial table
  12. // 初始table的hash桶索引边界
  13. int baseLimit; // index bound for initial table
  14. // 初始table的长度
  15. final int baseSize; // initial table size
  16. /**
  17. * 如果有可能,返回下一个有效节点,否则返回null。
  18. */
  19. final Node<K,V> advance() {
  20. Node<K,V> e;
  21. //获取Node链表的下一个元素e
  22. if ((e = next) != null)
  23. e = e.next;
  24. for (;;) {
  25. Node<K,V>[] t; int i, n; // must use locals in checks
  26. // e不为空,返回e
  27. if (e != null)
  28. return next = e;
  29. // e为空,说明此链表已经遍历完成,准备遍历下一个hash桶
  30. if (baseIndex >= baseLimit || (t = tab) == null ||
  31. (n = t.length) <= (i = index) || i < 0)
  32. // table 遍历完成,到达边界,返回null
  33. return next = null;
  34. // e = tabAt(t, i) 获取下一个hash桶对应的node链表的头节点
  35. if ((e = tabAt(t, i)) != null && e.hash < 0) {
  36. // 扩容节点,说明此hash桶中的节点已经迁移到了nextTable
  37. if (e instanceof ForwardingNode) {
  38. // 当前遍历 table 换做 nextTable
  39. tab = ((ForwardingNode<K,V>)e).nextTable;
  40. e = null;
  41. // 保存当前table的遍历状态
  42. pushState(t, i, n);
  43. continue;
  44. }
  45. //红黑树
  46. else if (e instanceof TreeBin)
  47. e = ((TreeBin<K,V>)e).first;
  48. else
  49. e = null;
  50. }
  51. if (stack != null)
  52. // 此时遍历的是迁移目标nextTable
  53. // 尝试回退到源table,继续遍历源table中的节点
  54. recoverState(n);
  55. else if ((index = i + baseSize) >= n)
  56. // 初始table的hash桶索引+1 ,即遍历下一个hash桶
  57. index = ++baseIndex; // visit upper slots if present
  58. }
  59. }
  60. /**
  61. * 在遇到扩容节点时保存遍历状态。
  62. */
  63. private void pushState(Node<K,V>[] t, int i, int n) {
  64. // s 指向上一个出栈的 table 状态
  65. TableStack<K,V> s = spare; // reuse if possible
  66. if (s != null)
  67. // 如果存在上一个出栈 table 的引用
  68. // 那么spare指向上上一个出栈 table
  69. spare = s.next;
  70. else
  71. s = new TableStack<K,V>();
  72. // 保存当前 table 的遍历状态
  73. s.tab = t;
  74. s.length = n;
  75. s.index = i;
  76. s.next = stack;
  77. stack = s;
  78. }
  79. /**
  80. * 可能会弹出遍历状态。
  81. * @param n length of current table
  82. */
  83. private void recoverState(int n) {
  84. TableStack<K,V> s; int len;
  85. /**
  86. (s = stack) != null :
  87. stack不空,说明此时遍历的是nextTable
  88. (index += (len = s.length)) >= n :
  89. 确保了按照index,index+table.length的顺序遍历nextTable,
  90. 条件成立表示nextTable已经遍历完毕
  91. */
  92. // 如果进入循环,nextTable中的桶遍历完毕
  93. while ((s = stack) != null
  94. && (index += (len = s.length)) >= n) {
  95. // 弹出table,获取table的遍历状态,开始遍历table中的桶
  96. n = len;
  97. index = s.index;
  98. tab = s.tab;
  99. s.tab = null;
  100. // 下面是简单的链栈操作
  101. // 弹出 table,存入到 spare中
  102. TableStack<K,V> next = s.next;
  103. s.next = spare; // save for reuse
  104. stack = next;
  105. spare = s;
  106. }
  107. // 和 advance 方法最后一段代码作用相似
  108. // table的index桶链表遍历完,遍历下一个 hash桶
  109. // index记录下一次访问索引(当前访问索引 baseIndex++)
  110. if (s == null && (index += baseSize) >= n)
  111. index = ++baseIndex;
  112. }
  113. }

在 Traverser 类中,最重要的就是 advance 方法,那么了解一下这个方法的整体思路(其中的 table 表示源table,nextTable 表示扩容之后的 table):

当迭代器创建或者调用 next 方法是,都会调用 Traverser 中的 advance 方法来获取下一个有效结点。这个方法会根据当前 table 中 index 桶的状态来返回有效结点,如果桶是处于正常状态,那么就返回桶中的结点;如果 index 桶处于扩容状态,那么就根据 nextTable 中的 index 和 index+table.length 桶的状态来返回有效结点。在 nextTable 中访问完目标结点之后,返回到 table 再次向 ++index 桶的状态进行判断并返回,直到 table 中的桶被遍历完。

那么知道了大致的实现过程,下面使用图解的方式再次理解,加深印象:

1.当遍历到 pwd 结点的时候,说明正在扩容,此结点的数据已经被迁移到 nextTable

2.将 table1 的遍历状态压栈stack,遍历迁移到 nextTable(table2)的 hash 桶

  • 由于扩容时,会将 table(table1) 中索引为 index 桶,迁移到 nextTable(table2) 中索引为index和索引为 index+table.length 的2个桶中。因此在 nextTable(table2) 中的遍历顺序为:index,index+table.length (table2中的桶2,和桶4 )
  • 如果 桶2 和 桶4 都是正常的节点,遍历完 桶2 和 桶4 后,就会将 stack 中的table(tab1) 弹出,继续遍历 table(tab1) 中的桶
  • 桶2 是 fwd 节点,说明 table2 也被扩容,此节点的 node 被迁移到了 nextTable(table3) 中

3.将 table2 的遍历状态压栈stack,遍历迁移到 nextTable(table3)的 hash 桶

因为在 table2 中 桶2 中只有 A。根据扩容规则:在 table3 中,A 就可能在 桶2 中,也可能在 桶6 (2+table2.length)中

根据扩容迁移规则:先遍历 index = 2 的 hash 桶

接着遍历 index = 6 的 hash 桶(为什么要遍历 桶6 可见源码 recoverState 方法)

4.遍历完 table3 后,table2 出栈,根据遍历状态信息,继续遍历 table2

table2 弹出栈stack,进入了栈 spare(对象重利用,减少内存开销)
table2 记录当前 index = 2,那么接着遍历下一个 index=4(2+table1.length) 的 hash 桶
桶4 是一个 pws 结点,那么 table2 压栈stack(index = 4),直接复用 spare 指向的 table2 引用

5.遍历 nextTable(table3)中的 桶4、桶8(4+table2.length)

6.完成 nextTable(table3)的遍历,table2 出栈并遍历

table2 遍历完毕。

7.table1 出栈并遍历

stack 栈空,而且 spare 栈按照出栈的顺序保存了相关节点,遍历完毕。

至此 table 的图解遍历就完成了。

在 advance 方法中通过不断循环遍历,其中考虑到table的大小发生变化,并且节点的组织方式可能是链表也可能是红黑树,遍历的过程中可能会有部分数据遍历不到,此为弱一致性的表现。

相关文章

最新文章

更多