JDK并发工具类源码学习系列——ConcurrentHashMap

x33g5p2x  于2021-12-18 转载在 其他  
字(13.9k)|赞(0)|评价(0)|浏览(470)

欢迎阅读本系列更多文章:JDK并发工具类源码学习系列目录
作为JDK并发工具类源码学习系列的第一个被分析的类,ConcurrentHashMap类在我的开发过程中经常被使用。个人觉得如果在共享一个Map时,如果无法判断是否需要加锁,那么就干脆直接使用ConcurrentHashMap,即能保证并发安全,同时性能也不会有太多下降,因为ConcurrentHashMap可实现无锁读,不过内存会占用的多些,但是并不明显,基本可以忽略。
下面我们就来看看ConcurrentHashMap类的内部构造。

1. 结构预览
1. 类定义
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable

上面是ConcurrentHashMap类的定义,从ConcurrentHashMap的定义可以看出ConcurrentHashMap是实现了ConcurrentMap接口,而非直接实现Map接口。同时ConcurrentMap的子接口还有一个ConcurrentNavigableMap,表示可支持导航的并发Map。可见ConcurrentMap接口定义可支持并发,NavigableMap接口定义可支持导航,SortedMap接口定义可支持排序,NavigableMap继承自SortedMap。从Map的API介绍可以看出Java Collections Framework家族中重要一员——Map的组织结构——通过接口定义Map的行为,或者说Map可支持的功能,多个接口之间可交叉,如ConcurrentNavigableMap即实现ConcurrentMap接口又实现NavigableMap接口。

2. 类结构

从图中可以看出ConcurrentHashMap内部包含了多个内部类,其中最重要的也是我们最需要关心的是:SegmentHashEntry
Segment是ConcurrentHashMap非常重要的一个内部类,是ConcurrentHashMap实现高并发的关键点,Segment在ConcurrentHashMap中承担着所有的操作,即所有对ConcurrentHashMap的操作最终都会对Segment进行操作。因为Segment保存了最终的数据,而ConcurrentHashMap只是保存了一个Segment的数组。ConcurrentHashMap通过N个Segment将数据切分成N块,而每块之间是互不影响的,所以理论上可以同时并行的执行N个需要加锁的操作,这就是ConcurrentHashMap并发的基础。
HashEntry同HashMap中的Entry,每个HashEntry是一个节点,保存key和value,以及下一个节点。HashEntry中的key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型,可见HashEntry类的value是可变的,其他的key和next都是不可变的。
EntryIterator,EntrySet,HashIterator,KeyIterator,KeySet,ValueIterator,Values是辅助ConcurrentHashMap实现遍历的内部类。
下面简单介绍下SegmentHashEntry类。
HashEntry

static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;

        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }

        @SuppressWarnings("unchecked")
        static final <K,V> HashEntry<K,V>[] newArray(int i) {
            return new HashEntry[i];
        }
    }

HashEntry类的结构很简单,就是四个变量,一个构造函数,一个static方法。由于没有任何getter和setter方法,所以对其操作是直接访问变量。在 ConcurrentHashMap 中,在散列时如果产生“碰撞”,将采用“分离链接法”来处理“碰撞”:把“碰撞”的 HashEntry 对象链接成一个链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。所以链表中节点的顺序和插入的顺序相反。
Segment

static final class Segment<K,V> extends ReentrantLock implements Serializable

Segment继承自ReentrantLock ,所以它可以作为一个锁使用,其在ConcurrentHashMap也正是作为一个锁来使用的。

transient volatile int count;//Segment中保存的元素数量
transient int modCount;//记录Segment被修改的次数,用于在读取时判断读取期间改Segment是否有过修改,有的话则重试
transient int threshold;//阀值,元素数量达到该值则会进行自动扩展
transient volatile HashEntry<K,V>[] table;//桶,一个HashEntry的数组,按HashCode值散列保存,采用链表解决hash碰撞问题
final float loadFactor;//负载因子

count变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。注意,之所以在每个 Segment 对象中包含一个计数器,而不是在 ConcurrentHashMap 中使用全局的计数器,是为了避免出现“热点域”而影响 ConcurrentHashMap 的并发性。

从Segment拥有的方法可以看出,针对ConcurrentHashMap的操作基本上都会调用具体某个Segment的对应方法,如put会调用Segment的put方法。所以Segment是最终的操作类。

下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图。

Segment的方法会在介绍ConcurrentHashMap的方法时进行解释,这里先不介绍。

2. 构造器解读
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
    }
public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }
public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

以上的构造器都只是一个个重载函数,最终都会调用下面的构造器。其中使用到了三个常量:

  • DEFAULT_INITIAL_CAPACITY:默认初始容量
  • DEFAULT_LOAD_FACTOR:默认加载因子
  • DEFAULT_CONCURRENCY_LEVEL:默认并发级别,该值决定一个包含多少个Segment,即将ConcurrentHashMap切分成多少块
public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }

该构造函数需要制定初始容量、加载因子以及并发级别,对应上面提到的三个常量(默认值)。代码前几句是对参数进行正确性校验。// Find power-of-two sizes best matching arguments这句注释的意思是寻找一个参数的最佳匹配值:最接近指定的参数的2的幂方值。下面我们对照着代码来说明这句话的含义:

// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
}

这里定义了一个ssize变量,该变量就是concurrencyLevel的最佳匹配值,可以看见首先是循环,直到ssize>=concurrencyLevel,所以最佳匹配值是大于等于指定参数的,循环里面每次会将ssize右移一位,即*2,所以最终得到的值就是一个最接近且大于等于concurrencyLevel的2次幂方值。同时定义了一个sshift变量,该变量随着ssize的每次右移而+1,最终得到的即是ssize是2的多少次方,即sszie=2^sshift。继续往下看:

segmentShift = 32 - sshift;//偏移量
segmentMask = ssize - 1;//掩码值
this.segments = Segment.newArray(ssize);//初始化segments数组

segmentShift以及segmentMask在后面将一个hash映射到某一个segments时使用,目的是将hash均匀的分配到每个segments,具体为什么使用这两个来进行均匀分配我们这里不介绍。最后一句是初始化一个segments数组,大小是ssize,而非参数concurrencyLevel值。下面继续看:

if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
    ++c;
int cap = 1;
while (cap < c)
    cap <<= 1;

for (int i = 0; i < this.segments.length; ++i)
    this.segments[i] = new Segment<K,V>(cap, loadFactor);

initialCapacity是构造器指定的初始化容量,ssize是segments数组大小,所以c的值就是每个segments的容量。下面定义了一个cap,这里的cap和前面的ssize是一个含义,即选择一个最接近且大于等于c的2的幂方值,然后初始化segments数组,传入的参数有cap(segment容量)和loadFactor(负载因子)。这里选择cap作为segment容量,而非c,是出于方便后期对segment的容量进行扩充考虑,如果容量是2的幂方,那么想要将容量扩充一倍只需右移1位即可,同时保证依旧是2的幂方。
对于segment的初始化很简单,对loadFactor赋值,然后根据指定的初始容量创建一个HashEntry数组,并计算出threshold(阀值,当segment中的元素超过这个阈值则进行容量扩充)。

3. 常用方法解读

ConcurrentHashMap实现了Map接口,那么他的核心方法包括我们常用的put(K, V)、get(Object)、remove(Object)、contains(Object)、size(),同时继承自ConcurrentMap让他包含了putIfAbsent(K, V)、remove(Object, Object)、replace(K, V, V)、replace(K, V)四个并发方法。后面的四个并发方法是ConcurrentMap为我们提供的在并发情景下使用的工具方法,都是基于CAS来实现的。
在看put(K, V)、get(Object)等方法实现之前,先来看下这两个方法:hash(int)和segmentFor(int)。

/* ---------------- Small Utilities -------------- */

/** * Applies a supplemental hash function to a given hashCode, which * defends against poor quality hash functions. This is critical * because ConcurrentHashMap uses power-of-two length hash tables, * that otherwise encounter collisions for hashCodes that do not * differ in lower or upper bits. */
private static int hash(int h) {
    // Spread bits to regularize both segment and index locations,
    // using variant of single-word Wang/Jenkins hash.
    h += (h <<  15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h <<   3);
    h ^= (h >>>  6);
    h += (h <<   2) + (h << 14);
    return h ^ (h >>> 16);
}

/** * Returns the segment that should be used for key with given hash * @param hash the hash code for the key * @return the segment */
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

源码中对这两个方法的注释是:Small Utilities,即小工具方法。源码中对于hash方法的注释的意思是:该方法是一个补充hash方法,ConcurrentHashMap的hash表的长度是2的幂方,使用该补充hash函数可降低一些质量差的hash函数发生的碰撞概率。具体如何实现的就不看了,就算看懂了代码也很难理解这样做的原因,所以不浪费时间。segmentFor是为一个hash值找到它应该去的segment,这里使用到了segmentShift以及segmentMask,还记得segmentShift是32-sshift,这里将hash值无符号左移segmentShift位,即取hash值的高sshift位,然后同segmentMask按位与运算。其实就是取hash值的高sshift位将值限制在0~ssize之间,然后与ssize-1取余得到segments数组的下标(取高位是因为更加均匀,低位的重复率比高位高,臆测~!!!)。
了解了上面两个方法,下面我们就来看看put(K, V)、get(Object)、remove(Object)这三个方法的具体实现。

1. put(K, V)
public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key.hashCode());
    return segmentFor(hash).put(key, hash, value, false);
}

ConcurrentHashMap的put方法内部只是根据key的hash值找到对应的Segement,然后调用Segement的put方法,注意Segement的put方法的第四个参数,这里穿的值是false。我们主要分析下Segement的put方法。Segement在这里的作用就是将元素均匀分成N等份,各个Segement之间互不干扰,读写也不会发生冲突,降低并发要求。

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

首先第一步就是lock,看来再NB的并发类在写时也需要lock啊。读取count值,从count的注释可以看出该值是记录Segment包含的元素数量,volatile修饰的(这里利用了volatile变量的内存可见性)。然后判断增加之后元素数量是否超过阈值,超过的话提前扩容。接着找到该hash对应的table(桶),简单的取余操作。找到该table的第一个元素——first,因为ConcurrentHashMap使用链表来解决hash冲突问题,所以这里的table是一个链表。

while (e != null && (e.hash != hash || !key.equals(e.key)))
      e = e.next;

通过循环,并通过比较hash值以及equals()校验,寻找与key相同的已存在的元素。

V oldValue;
 if (e != null) {
     oldValue = e.value;
     if (!onlyIfAbsent)
         e.value = value;
 }
 else {
     oldValue = null;
     ++modCount;
     tab[index] = new HashEntry<K,V>(key, hash, first, value);
     count = c; // write-volatile
 }

e!=null说明找到与要插入的元素key相同的元素,那么onlyIfAbsent=false则直接将原元素的value值替换,返回原值,由于HashEntry的value是volatile的,所以修改之后会立即被后续线程可见;onlyIfAbsent=true则不做任何操作。e==null时,modCount自增(modCount记录了对该Segment的进行的结构性修改的次数,modCount值使得在进行批量读取时能够知道在读取期间Segment结构是否被修改来决定是否进行加锁读取)。**tab[index] = new HashEntry(key, hash, first, value)**这句就是将被插入的元素添加到链表中,但是插入的位置是头部,而非尾部。HashEntry的构造器传入一个HashEntry对象,该对象是链表原来的头部,被作为新创建的节点的next指针,所以新的链表的头部元素是新增加的,后面接着是原来的链表。
注意:此处的lock并非对整个Map进行加锁,而只是对该Segment进行加锁,所以如果一个线程进行put操作,其他的另外15个(ssize-1)Segment仍是可访问的。

2. remove(Object)
public V remove(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
}

/** * Remove; match on key only if value null, else match both. */
V remove(Object key, int hash, Object value) {
    //由于remove是结构性修改,所以第一步便是lock
    lock();
    try {
        //读取count值,此处是利用volatile变量的内存可见性来保证读线程能够及时的读取到最新值(后面会单独介绍)
        int c = count - 1;
        //是根据key的hashCode找到该节点对应的桶
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        //循环找到该节点
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue = null;
        if (e != null) {
        //找到待删除节点
            V v = e.value;
            //如果value==null,则无需关心节点的值是否与指定值相同,否则只有在两者相同情况才可删除
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

依旧调用的是对应的Segment的remove()方法。由于remove是结构性修改,所以需要进行加锁操作。在删除一个节点时,为了不影响正在遍历链表的线程,这里采用了复制方式,而非直接移除待删除节点。具体工作方式:将待删除节点之后的节点不动,而待删除节点之后的节点复制到另外一个链表,看代码:HashEntry<K,V> newFirst = e.next;这句将待删除节点的next节点赋值给newFirst for (HashEntry<K,V> p = first; p != e; p = p.next)此处的for循环从链表的头部开始一直循环到待删除节点为止,newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value);for循环内部根据当前循环的节点新建了一个key和value、hash都相同的节点,不同的是next指向了前一个新建的节点(第一个newFirst是待删除节点的下一个节点),即构成了一个以待删除节点的前一个节点为头结点的新的链表,然后tab[index] = newFirst;将该链表赋到对应的桶上,便完成了整个删除操作,最终新的链表以待删除节点的前一个节点为头结点。
下面通过图例来说明 remove 操作。假设写线程执行 remove 操作,要删除链表的 C 节点,另一个读线程同时正在遍历这个链表。
执行删除之前的原链表:

执行删除之后的新链表:

从图中可以看出被删除节点之后的节点原封不动保留在链表中,而之前的链表从后往前依次被复制到新的链表中,但是原链表在我们进行remove操作过程中始终是会发生任何变化的,所以写线程对某个链表进行remove操作不会影响其他的并发读线程对这个链表的遍历访问。

3. get(Object)
public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

ConcurrentHashMap的get()方法同put()一样,也是依赖于Segment的get()方法。下面看看Segment的get()方法

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

 /** * Returns properly casted first entry of bin for given hash. */
 HashEntry<K,V> getFirst(int hash) {
     HashEntry<K,V>[] tab = table;
     return tab[hash & (tab.length - 1)];
 }

/** * Reads value field of an entry under lock. Called if value * field ever appears to be null. This is possible only if a * compiler happens to reorder a HashEntry initialization with * its table assignment, which is legal under memory model * but is not known to ever occur. */
V readValueUnderLock(HashEntry<K,V> e) {
    lock();
    try {
        return e.value;
    } finally {
        unlock();
    }
}

从代码可以看到get()方法在读取时无需进行加锁操作,除非读取到的值为NULL。为什么读取一个节点的值为NULL的时候需要加锁呢?因为ConcurrentHashMap是不允许NULL作为key或者value的,所以是不应该出现读取一个节点的值为NULL的情况,如果出现这种情况,说明出现了并发问题,所以加上锁再次读取!(什么情况下会出现这种情况并不清楚)。

4. 总结

ConcurrentHashMap在进行结构性修改,如put/remove/replace时都需要进行加锁,但是读取并未加锁,并发情况下,由于内存不同步问题,会导致一个线程的写操作并不会立即对另一个线程可见。这里ConcurrentHashMap通过volatile变量的内存可见性特性来保证一个线程的写操作立即被其他线程可见,每个方法在一开始都会读取count这个变量,该变量就是一个volatile变量,多个线程之间通过读写这个变量来保证内存可见性,具体可参考下方的关于JVM内存可见性的说明。
上面三个方法基本包含了整个ConcurrentHashMap的读写操作(replace(K, V)方法只是简单的更新节点的value值,由于value是volatile的,所以也不会影响读线程),从三个方法的分析来看ConcurrentHashMap首先通过Segment对整个数据集进行切分,并通过对各个部分的数据集进行加锁来提高整个数据集的并发性;通过读写分离的方式实现无锁读,加锁写,进一步提高ConcurrentHashMap的读写效率;并通过volatile变量的特性实现读写的可见性保证。

4. 使用场景

ConcurrentHashMap由于其即使在同步的情况下依旧保证高效的读写性能,所以在很多需要使用HashMap的情况都适用,当然单线程情况并不需要使用同步的ConcurrentHashMap。如果无法保证你的HashMap只是在单线程情况下使用那么就使用ConcurrentHashMap,因为其在单线程情况下的效率也并不低。
下面是针对单线程环境下ConcurrentHashMap和HashMap的put性能的对比:
硬件PC:普通PC机,i5
JVM:内存1G
测试数据:执行10次,计算均值
结果:表格

MapPUT1W次PUT10W次PUT100W次
ConcurrentHashMap2175317280681931355076232
HashMap120113128068193407341713
Java 内存模型

由于 ConcurrentHashMap 是建立在 Java 内存模型基础上的,为了更好的理解 ConcurrentHashMap,让我们首先来了解一下 Java 的内存模型。
Java 语言的内存模型由一些规则组成,这些规则确定线程对内存的访问如何排序以及何时可以确保它们对线程是可见的。下面我们将分别介绍 Java 内存模型的重排序,内存可见性和 happens-before 关系。

重排序

内存模型描述了程序的可能行为。具体的编译器实现可以产生任意它喜欢的代码 – 只要所有执行这些代码产生的结果,能够和内存模型预测的结果保持一致。这为编译器实现者提供了很大的自由,包括操作的重排序。
编译器生成指令的次序,可以不同于源代码所暗示的“显然”版本。重排序后的指令,对于优化执行以及成熟的全局寄存器分配算法的使用,都是大有脾益的,它使得程序在计算性能上有了很大的提升。
重排序类型包括:

  • 编译器生成指令的次序,可以不同于源代码所暗示的“显然”版本。
  • 处理器可以乱序或者并行的执行指令。
  • 缓存会改变写入提交到主内存的变量的次序。
内存可见性

由于现代可共享内存的多处理器架构可能导致一个线程无法马上(甚至永远)看到另一个线程操作产生的结果。所以 Java 内存模型规定了 JVM 的一种最小保证:什么时候写入一个变量对其他线程可见。
在现代可共享内存的多处理器体系结构中每个处理器都有自己的缓存,并周期性的与主内存协调一致。假设线程 A 写入一个变量值 V,随后另一个线程 B 读取变量 V 的值,在下列情况下,线程 B 读取的值可能不是线程 A 写入的最新值:

  • 执行线程 A 的处理器把变量 V 缓存到寄存器中。
  • 执行线程 A 的处理器把变量 V 缓存到自己的缓存中,但还没有同步刷新到主内存中去。
  • 执行线程 B 的处理器的缓存中有变量 V 的旧值。
Happens-before 关系

happens-before 关系保证:如果线程 A 与线程 B 满足 happens-before 关系,则线程 A 执行动作的结果对于线程 B 是可见的。如果两个操作未按 happens-before 排序,JVM 将可以对他们任意重排序。
下面介绍几个与理解 ConcurrentHashMap 有关的 happens-before 关系法则:

  1. 程序次序法则:如果在程序中,所有动作 A 出现在动作 B 之前,则线程中的每动作 A 都 happens-before 于该线程中的每一个动作 B。
  2. 监视器锁法则:对一个监视器的解锁 happens-before 于每个后续对同一监视器的加锁。
  3. Volatile 变量法则:对 Volatile 域的写入操作 happens-before 于每个后续对同一 Volatile 的读操作。
  4. 传递性:如果 A happens-before 于 B,且 B happens-before C,则 A happens-before C。

以上摘自探索 ConcurrentHashMap 高并发性的实现机制

参考文章

1.探索 ConcurrentHashMap 高并发性的实现机制
2.聊聊并发(四)——深入分析ConcurrentHashMap

欢迎访问我的个人博客~~~

相关文章