首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap
是由Segment
数组结构和HashEntry
数组结构组成。
Segment
是一种可重入锁(ReentrantLock
),在ConcurrentHashMap
里扮演锁的角色;HashEntry
则用于存储键值对数据。
一个ConcurrentHashMap
里包含一个Segment
数组,Segment
的结构和HashMap
类似,是一种数组和链表结构。一个Segment
里包含一个HashEntry
数组,每个HashEntry
是一个链表结构的元素,每个Segment
守护着一个HashEntry
数组里的元素,当对HashEntry
数组的数据进行修改时,必须首先获得与它对应的Segment
锁。
可以说,ConcurrentHashMap
是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表(Segment
)。
优点:多个线程去访问不同的Segment
,没有冲突,可以提高效率
缺点:Segment
数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒加载
concurrencyLevel
的第一个2 n 2^n2n的数。比如,如果concurrencyLevel
为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为16。理想情况下ConcurrentHashMap
的真正的并发访问量能够达到concurrencyLevel
,因为有concurrencyLevel
个Segment,假如有concurrencyLevel
个线程需要访问Map,并且需要访问的数据都恰好分别落在不同的Segment中,则这些线程能够无竞争地自由访问(因为他们不需要竞争同一把锁),达到同时访问的效果。这也是为什么这个参数起名为“并发级别”的原因。如果initialCapacity=64,concurrencyLevel=16,那么每个Segment下面的HashEntry容量就是 64/16 = 4。
默认每个Segment下面的HashEntry容量就是 16/16 = 1。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 验证参数的合法性
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// concurrencyLevel也就是Segment的个数不能超过规定的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,如果超过这个值,设置为这个值
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
// 使用循环找到大于等于concurrencyLevel的第一个2的n次方的数ssize,这个数就是Segment数组的大小
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 计算每个Segment平均应该放置多少个元素。比如初始容量为15,Segment个数为4,则每个Segment平均需要放置4个元素
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
get()
方法没有加锁,而是使用UNSAFE.getObjectVolatile()
来保证可见性
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
// u为Segment对象的下标
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// s即为Segment对象
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// e即为定位到的HashEntry
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
get()
方法)public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 获取输入key的hash值
int hash = hash(key);
// 计算出Segment的下标
int j = (hash >>> segmentShift) & segmentMask;
// 定位到对应的Segment对象,判断是否为null,如果是则创建Segment
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
// CAS确保segment被出啊昂见且不被重复创建
s = ensureSegment(j);
// 进入Segment的put()流程
return s.put(key, hash, value, false);
}
ConcurrentHashMap在1.8中的实现,相比于1.7的版本基本上全部都变掉了。首先,取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。而对于锁的粒度,调整为对每个数组元素加锁(链表头/树根 Node)。然后是定位节点的hash算法被简化了,这样带来的弊端是hash冲突会加剧。因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。这样一来,查询的时间复杂度就会由原先的O(n)变为O(logN)。下面是其基本结构:
// 默认为0
// 初始化时为-1
// 扩容时为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为下一次扩容的阈值大小
private transient volatile int sizeCtl;
// 整个ConcurrentHashMap就是一个Node类型的数组table
static class Node<K,V> implements Map.Entry<K,V> {}
// 懒加载,负责存储Node结点
transient volatile Node<K,V>[] table;
// 扩容时的新数组
private transient volatile Node<K,V>[] nextTable;
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 初始容量小于并发度时会将初始容量改为并发度
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
// 懒加载,在构造器中仅仅计算了数组的容量大小,只有在后面第一次使用时(put())才会真正创建
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// tableSizeFor()保证数组容量为2^n
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
get()
全程不需要加锁是因为Node
对象的value
是用volatile
修饰的(volatile V val
)
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// spread()确保返回结果是正整数
int h = spread(key.hashCode());
// 定位链表头所在的数组下标
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头结点就是要查找的key,直接返回value
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 正在扩容或链表树化,去新链表或红黑树中去找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 顺着链表正常找下去
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
public V put(K key, V value) {
return putVal(key, value, false);
}
// onlyIfAbsent:是否选择用新值覆盖掉旧值
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f是链表的头结点
// fh是链表头结点的hash
// i是链表在table数组的下标
Node<K,V> f; int n, i, fh;
// 此时table数组还没有被初始化(懒加载)
if (tab == null || (n = tab.length) == 0)
// CAS初始化
tab = initTable();
// 首结点为null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS进行添加
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 帮忙扩容
// static final int MOVED = -1;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 对首结点加锁后再往链表(或树)后面添加
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
JDK1.8的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,JDK1.8的ConcurrentHashMap只是增加了同步的操作来控制并发。
JDK1.7:ReentrantLock + Segment + HashEntry
JDK1.8:synchronized + CAS + HashEntry + Node
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_43613793/article/details/120602707
内容来源于网络,如有侵权,请联系作者删除!