为什么有这篇文章
最近在看ConcurrentHashMap,也参考了一些有关的文章,大多浅尝辄止,没有解答在阅读过程中产生的疑惑。
总体来说,许多人对于ConcurrentHashMap的印象,停留在它是以链表或红黑树存储的数据结构,以分段锁减轻免竞争条件的并发实现。前者并非ConcurrentHashMap的核心竞争力,且以后者概括也不足以完全说明其优势。
ConcurrentHashMap的出现,是因为HashMap不是线程安全,HashTable线程安全却并发效率过低。ConcurrentHashMap仔细斟酌了两者的难处,在快速存储与高效并发间找到了微妙的平衡点。
ConcurrentHashMap为实现高并发做了哪些努力?答案的深浅就代表了对ConcurrentHashMap的掌握程度。文章因此而成(源码为JDK8)。
基础
HashMap简要的工作机制如下:
- 在HashMap中,以Node为粒度,存储K/V
- 每一个K,都能通过哈希值,按照规则,table上找到所在的索引位置,table类型为table[]
- 不同的哈希值,可能索引到相同的位置,此情况也称为碰撞
- 当发生碰撞时,在table[index]上,以链表形式,存储所有索引到此的Node
- 当链表过长后,查找变得没有效率,转换成红黑树,加快查找
- 因此在table[index]上是链表还是红黑树,取决于此位置的碰撞程度
- 如果存储的数据超过了table的最大容量,则生成重新的table,并将旧的table的Node重新索引到新的table上
ConcurrentHashMap维持了HashMap的工作特性,并在并发环境下也能如期进行。以HashTable来看,之所以不能胜任高并发环境下的工作,因其对所有方法上锁,导致临界区覆盖过大,竞争条件激烈,那么在并发量大时,每个线程受限于其他线程,与串行无异,违背了并发的初衷。
对于理解ConcurrentHashMap来说,一些属性是关键的:
// 与介绍HashMap相同,每个存储的Node该索引到的位置
transient volatile Node<K,V>[] table;
// 当容量不足时,创建容量更大的nextTable,并将table上所有的Node重新索引到nextTable上,再令table=nextTable
private transient volatile Node<K,V>[] nextTable;
// 加载因子,决定了table应该多满,越大则越容易发生碰撞,越小则越浪费空间,此消彼长。
// 与HashMap不同的是,恒为0.75,此值与ConcurrentHashMap的优化有关
private static final float LOAD_FACTOR = 0.75f;
/**
* 是ConcurrentHashMap非常重要,并难以理解的属性,sizeCtl用于并发控制与状态标记:
* =0时,ConcurrentHashMap为初始状态
* =-1,说明ConcurrentHashMap正在初始化
* >0,表示当前的最大容量
* <-1,说明ConcurrentHashMap正在进行扩容,此状态下 sizeCtl = -(1 + 扩容线程数)
*/
private transient volatile int sizeCtl;
/**
* 因为sizeCtl处理了多种状态,需要其他的属性协助参与
*/
// 扩容时用到,通过sizeCtl生成标记,回归标记状态,说明扩容完毕,状态有效位为16位
private static int RESIZE_STAMP_BITS = 16;
// 扩容线程最大数量,为 2^16,
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 扩容时用到,还原sizeCtl记录的扩容标记
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
复制代码
以上能容不必硬吞下,有个印象即可,后续提到后自然明了。
回到并发编程来看,提高并发效率可从几方面入手:
- 减少临界区区域:这是容易理解的,在并发中,临界区是依次通过的,即临界区是串行的。
- 加速通过临界区:使临界区内的逻辑代码执行得尽可能地快,这样,线程在临界区内停留的时间更短。
- 减少竞争条件:通过去除不必要的竞争条件,或竞争不同的临界区,来减少线程间的相互影响。
ConcurrentHashMap对于并发效率的优化,也将从这几方面入手。
分段扩容
与其他文章不同的是,我想从ConcurrentHashMap的扩容方法,即从transfer()说起。因为transfer()的处理方式,包含了ConcurrentHashMap重要的核心的内容,值得细说,并在理解之后,观看其他方法语义实现,将简单地多。
与HashMap一样,当数据容量超过加载因子,如果不进行扩容,继续允许新的存储。那么,碰撞问题将加剧,碰撞问题严重的区域效率变低。此时,就需要扩容,以维持HashMap保持的效率平衡。
ConcurrentHashMap.transfer()内容很长,将分段进行讲解。
确定分段长度
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 确定每一段的长度,NCPU代表CPU核数
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 每段最小长度,不小于 MIN_TRANSFER_STRIDE ,即16
stride = MIN_TRANSFER_STRIDE;
if (nextTab == null) {
// ①说明还没有创建新的table,创建新的
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 指向新的table
nextTable = nextTab;
// transferIndex是扩容时,旧table还未处理的索引位置
transferIndex = n;
}
// 新table的长度
int nextn = nextTab.length;
// 创建一个特殊的节点,为Node子类的ForwardingNode
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 表示是否要向下一个区域进行扩容
boolean advance = true;
// 表示扩容是否结束了
boolean finishing = false;
......
}
复制代码
比较有意思的变量是 stride,中文解释为“一步的距离”,应对于扩容场景,意思为“每个区域的大小”。从stride的获取方式,考虑了CPU核实,直接说明了是支持并发扩容的。那么,每一个进行并发的参与者,一次负责一个stride长度的区域,stride最低为16。之后将马上见到这部分内容。
在①处,将有个疑问:此处并没上锁,那么怎么保证接下来的操作,使用的nextTab都是同一个?此处实际上时候更上层的调用来进行保证的,如果与transfer()做了约定,传入null时,将只有一个线程通过①处。留下第一个问题后续回答:扩容时,如何保证nextTab是同一个?
确定每个线程负责的扩容区域
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
......
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
// nextIndex:用于记录当前的transferIndex值
// nextBound:当前正在处理的区域的边界
int nextIndex, nextBound;
if (--i >= bound || finishing)
// 更新索引位置
// ①
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// ②
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 通过CAS竞争扩容区域的负责权
// ③
// 记住负责的扩容区域边界
bound = nextBound;
// 此时i就为当前负责的扩容区域的索引
i = nextIndex - 1;
advance = false;
}
}
......
}
复制代码
依据前面的内容,ConcurrentHashMap是支持并发扩容的,那么,每个线程将有自己负责进行扩的区域:
- 首先,advance为false时,表示当前线程正在处理某个区域的扩容任务;advance为true时,表示当前线程正在寻找下一个要扩容的区域,或在自己处理的区域上往前索引。
- 紧接着,transferIndex表示了还未被非配的区域的索引边界,当线程领取某个区域的扩容任务时,需要更新transferIndex值。因此,将在transferIndex上产生竞争条件,则需要nextIndex保存竞争条件前transferIndex的值,以通过CAS进行竞争。
- 最后,线程通过CAS竞争到了某个区域的扩容任务,bound记录了边界,i记录了当前位置。
那么,
在①处:意味着当前线程还没处理完此区域的扩容任务,因为还未超过边界bound;
在②处:意味着已经没有区域要分配了,要不就是都处理完了,要不就是所有区域都有其他线程在处理了,不再需要当前线程的参与了;
在③处:是否还记得stride?代表每个区域的长度。所有进入的线程,通过CAS拿到自己负责的区域,并更新transferIndex,记住边界bound,以及当前处理的索引位置;
过程如图:
可以简单说明的是,TRANSFERINDEX 为 transferIndex的内存地址,因对transferIndex的更新有竞争条件,,通过CAS操作就可以保证对transferIndex操作的原子性。CAS操作可以简单理解为由底层保证的单一变量的原子操作,接受对象地址,预期值,要更新的值为参数,当预期值符合,并成功把值设置为要更新的值时,返回true。
因此,通过竞争更新transferIndex的值,线程就获得了table上某段区域的扩容任务,nextIndex(预期值,要知道transferIndex可能在调用CAS前改变了)辅助了参与CAS竞争。
扩容前处理
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
......
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 扩容结束了
nextTable = null;
// 指向新的table
table = nextTab;
// 更新当前table的最大容量
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 说明当前线程完成了区域的扩容任务,更新sizeCtl的值成功
if (/*①*/ (sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//说明整个扩容还没结束,但是自己的扩容任务也完成了,直接返回
return;
// 说明整个扩容结束,将再次进到循环,然后返回
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// f记住了当前位置的 node
// 将其更新为ForwardingNode,以标记此位置已经处理
// 因为没有内容,尝试直接标记为处理了,失败会再进来
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 说明这个位置已经处理过了
advance = true;
......
}
复制代码
在扩容前,通过f记录旧table[i]位置上的所有Node。如果当前位置没有内容,在旧table[i]上更新类型为ForwardingNode的Node,如此标记了此位置已经处理了。
扩容是并发的,在①处,处理了退出的逻辑。当前阶段,可以简要理解为,resizeStamp()得到一个标记值,当 (sc = sizeCtl) – 2 回归到这个标记值时,意味着并发扩容结束。前面有提到,sizeCtl表达了多种状态,这里就留下了第二问题后续回答:sizeCtl如何控制多种并发状态?
并发结束时,以 sizeCtl = (n << 1) – (n >>> 1) 更新了最大容量。左移1位为2倍,右移1位为1/2,结果为n的1.5倍。即 n * 1.5 = 2n * 0.75,最大容量保持了新table长度的加载因子容量。
扩容处理
此时,拿到了table[i]上的Node —— f,无论f处是构成了链表或是红黑树,其表达的并发扩容处理控制是一致的。那么,就以链表为例,抛砖引玉。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
......
// 锁住此位置,也就是分段锁
synchronized (f) {
if (tabAt(tab, i) == f) {
// lh 表示索引位置不变的Node
// hn 表示索引位置要改变的Node
Node<K,V> ln, hn;
if (fh >= 0) {
// 哈希值大于 > 表示,table[i]上的Node构成了链表
// ①
// n 可以理解为掩码,就runBit用来快速确定Node扩容后是否要去到新的位置
int runBit = fh & n;
// 用来加快 ln 与 hn 的处理
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
// ②
// 更新 lastRun 和 runBit
// 在此循环结束后,lastRun之后的所有Node,将去往相同的位置。
runBit = b;
lastRun = p;
}
}
// ③
if (runBit == 0) {
// lastRun之后的所有Node在同样索引的位置
ln = lastRun;
hn = null;
}
else {
// lastRun之后的所有Node在将去往新的索引位置
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// ④
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将ln更新到新table的同样的索引位置
setTabAt(nextTab, i, ln);
// ⑤
// 将ln更新到新table的新的索引位置
setTabAt(nextTab, i + n, hn);
// 标记为处理了
setTabAt(tab, i, fwd);
// 使当前线程能对当前区域的下一个位置进行处理
advance = true;
}
......
}
复制代码
要理解上面的内容,需要了解是怎么在table上确定自己的索引位置的。可以直接确定的是,table的容量,总为2的倍数。
确定位置的方法为,通过Key的hash值处理得到一个值假设以ph表示,然后与table的长度做并操作,注意,table作为数组,索引从0开始。假设table长度为n,那么索引位置 = ph & (n – 1)。 那么:
ph决定数据的索引位置的有效位位数,取决于(n-1)二进制最低位开始的连续为“1”的位数。在理解了这一点后,对table[i]的扩容处理,就变成了如图所示:
- 在table上,i处所有的node,通过其 p & (n – 1) 均能索引到 i,并且都已经处于i位置上。
- 在nextTable上,nextTable长度发生了变化,意味着 (n-1) -> (2n-1)。而确定索引值是需要关心有效位的,此时有效位也放生了变化,因此,table[i]上的Node,将有可能在nextTable上处于不同的位置。
Node最终去向nextTable的索引位置,为nextTable[i] 或 nextTable[i + n],这是如何得来的呢?
假设 n = 16 = 1 0000, n-1 = 15 = 1111。
此时确定索引位置的所有Node的ph有效位为低4位,假设都为 1010。
当 n = 2n,2n-1 = 31 = 1 1111时,原来 table[i]上的Node的ph有效位多了一位共为低5位。
有效位为 0 1010 或 1 0101。
因此,新多出来的有效位为0或1就决定了不同的去向.
为0去向nextTable[i],为1去向nextTable[i + x].
即从前者位置 1010,后者位置 1 1010 = 1010 + n = 1010 + 1 0000.
即去向的不同取决于第五位。
复制代码
那么,
- 在①处: ph & n 就可以确定新的有效位为0 或 1,决定Node的去向,runBit就代表了结果
- 在②处: 通过 runBit,确定从链表某个位置起,ph & n 的值不再改变,即lastRun位置起所有的Node将去往nextTable相同的位置
- 在③处,记录 runBit确认的那一段lastRun链表,runBit为0时,由ln记住;为1时,由hn记住,用于加快处理。
- 在④处,为lastRun之前的Node,按照 ph & n 的结果,都创建新的Node(同样的k,v,hash),分别进入ln或hn
- 在⑤处,将ln放到nextTable[i],hn放到nextTable[i + n]
构造ln和hn的过程如下:
特别提醒的是,③和④表达的目的是,如何快速地整理出ln和hn以去向不同的nextTable索引位置,而不是一些文章所说的ln和hn是逆向的队列.
一个有趣的地方是,④处创建新的Node而不是完全复用旧的Node,是因为这样可以重新分配竞争条件,可能加快并发,后续会继续说明。
而以二叉树为结构的table[i],表达的也是同样语义,有兴趣的可以阅读,无须拘泥。
扩容小结
扩容的过程可以总结为:
- 以stride为长度单位,将table划分各个区域
- 每个参与扩容的线程,通过CAS竞争更新transferIndex,分配到负责的区域
- 每个线程知道自己负责的区域边界,对区域内的table[i]逐个进行扩容处理
- 正在被处理的table[i]的位置,将被标记为ForwardingNode
- 每个table[i]上的Node,将可能去往nextTable的不同索引位置,nextTable[i]或nextTable[i + n],通过新有的有效位加以区分
- 当处理完区域后,向上返回。由更上层控制,要不要继续进入transfer()。
sizeCtr状态控制
接下来,将解答一个问题,sizeCtl如何控制多种并发状态?。
理解ConcurrentHashMap的sizeCtr属性如何工作,是理解ConcurrentHashMap各方法语义实现的关键连结点。
private transient volatile int sizeCtl;
复制代码
sizeCtl是32位的int,不被序列化,所有线程可见,初始值为 0。
初始化
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// 返回最接近的大于或等于initialCapacity的 2^x
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
复制代码
ConcurrentHashMap允许的table最低长度为16,在初始化时可以传入希望的长度,但是将会找到最接近的大于或等于initialCapacity的 2^x 作为table长度,sizeCtl将暂时记住这个长度。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 说明其他线程正在初始化table,让出时间片
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 通过CAS锁竞争,更新sizeCtl为-1,由成功的线程负责处理
try {
if ((tab = table) == null || tab.length == 0) {
// 再次确认table没有创建
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 创建了table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 标记当前能容纳的最大数据量,即 table长度 * 加载因子。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
table将以懒加载形式创建,创建过程是可能发生并发的。通过CAS进行锁竞争能确定负责创建的线程,此时将 sizeCtl 置位 – 1,表示有进程正在创建table。那么,其他线程看到此信息,就会让出时间片给创建table的线程。table创建之后,sizeCtl = table长度 * 加载因子,即位操作 n – (n >>> 2)。此后扩容更新,依旧保持按此规则更新。
此阶段得出:
sizeCtl = 0,为初始状态
sizeCtl = -1, 为table创建状态
sizeCtl = 2^x * 0.75,表示当前table允许的最大容量
扩容标记
在扩容阶段,sizeCtl将作为扩容结束标记,与记录在扩容的线程数。
// 返回一个扩容标记
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
复制代码
在需要扩容时,已当前table长度n为因子,通过resizeStamp()获得一个标记,当sizeCtl再次回归到这个标记时,代表扩容结束。
- 首先 Integer.numberOfLeadingZeros(n)语义为:返回一个值,代表n的二进制从最高位起,知道遇到某一位是1,一共有多少连续的0位。
- 1 << (RESIZE_STAMP_BITS – 1)返回一个仅在低16位为1的int。
- 二者或操作,得到一个标记数值。
int为32位,假设 n = 16,则
Integer.numberOfLeadingZeros(n) | 1 << (RESIZE_STAMP_BITS - 1)
=
0000 0000 0000 0000 0000 0000 0001 1011
|
0000 0000 0000 0000 1000 0000 0000 0000
= rc =
0000 0000 0000 0000 1000 0000 0001 1011
复制代码
如上面的例子,16位的 “1000 0000 0001 1011” 将作为sizeCtl的高16位,用于做扩容标记。
为什么使用resizeStamp()进行处理得到标记的数字,而不是以某个特定的数值作为标记呢?
一方面是因为,如果以特定的数值,并发状态下table可能继续被扩容。假设以一个特定的值Z代表继续扩容,在T0时刻,参与扩容的的线程都记录了一个值Z,长度从N扩容完成到了2N。在T1时刻,有一个参与扩容的线程醒来,发现值为Z,认为需要扩容,将长度从2N扩容到了4N。当然,可以通过加锁来保证一致性,但将降低并发效率。因此,通过resizeStamp()获取到一个值X,当table长度改变时,通过resizeStamp()获取到一个值将得到X1,X与X1不同,也就避免了这个问题。仔细一看,像不像CAS的ABA问题?
另一方是因为,sizeCtl还将记录正在扩容的线程数,从ConcurrentHashMap的属性MAX_RESIZERS看,最多支持2^16 个线程协助扩容。使用 sizeCtl 的低16位足以表示。
线程记录
Java中的基础类型,是有符号数,即二进制的最高位是符号位,0为正数,1位负数。通过resizeStamp()得到值,作为sizeCtl的高16位,则sizeCtl<0。
由于有符号数二进制加减的特点,对于负数进行加减,如果只看低16位,就像正数的的加减一样。因此,通过对sizeCtl进行加减就可以更新记录线程数。
特别注意的是,第一个线程进入时,是+2,后续线程进入时+1,退出时-1,然后检查-2后是否符合resizeStamp()获得的值以判断扩容是否结束。
sizeCtr小结
由此,sizeCtr的工作逻辑为:
sizeCtl = 0,为初始状态
sizeCtl = -1, 为table创建状态
sizeCtl = 2^x * 0.75,表示当前table允许的最大容量
扩容时,sizeCtl高16位为扩容结束标记,低16位为并发现线程数
从transfer()里和其他对sizeCtr的操作,使用了很多的位操作协助处理逻辑,加快线程通过临界区。这些位操作的前提示,要使table的长度n恒为2幂才能得以完成,有兴趣可自行证明,不难。
分段锁
分段锁的意义在于:各个区域拥有自己的锁,以使得线程去往不同的区域竞争条件,以此减少线程间的相互影响。
ConcurrentHashMap在table[i]上,存储了以Node为单位的不同数据集合,每个table[i],就是一个锁,由此分散了竞争。前文所说的transfer()方法,也是以table[i]的位置的Node为锁,再进行扩容操作。
现在,再以插入数据的方法putVal()继续观察。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 拿到hash值
int hash = spread(key.hashCode());
// 用于表示tabel[i]上的Node位置
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 懒加载,如果还没有table,加载
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 在table[i]上还没有Node,插入,退出
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 说明有线程正在扩容,帮忙扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
// 锁住table[i]
if (tabAt(tab, i) == f) {
// 再次检查加入到table[i]上
if (fh >= 0) {
// 链表的处理
......
}
else if (f instanceof TreeBin) {
// 红黑树的处理
......
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 如果table[i]上Node的数量到达了阈值,将链表转为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 更新数据存储量
addCount(1L, binCount);
return null;
}
复制代码
与tranfer()一样,插入数据时,也要拿到table[i]上的锁,才继续进行操作。当发现table[i]上被标记为 MOVED(即ForwardingNode)是,说明有线程正在进行扩容,进行帮忙。
如果table[i]上Node的数量大于等于TREEIFY_THRESHOLD,就将链表转为红黑树以加快索引。到达TREEIFY_THRESHOLD的概率为约为 0.00000006
友情提醒:不必拘泥于链表与红黑树具体是如何构造的,应关心的是ConcurrentHashMap是如何运作的。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// rs为扩容标记值
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// sizeCtl<0说明正在扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
/**
* 1.sc >>> RESIZE_STAMP_SHIFT) != rs 确定扩容标记是没有变化
* 2.sc == rs + 1 说明所有线程扩容完毕,不用去帮忙了
* 3.sc == rs + MAX_RESIZERS 说明扩容线程达到了上限
* 4.transferIndex <= 0 说明table所有区域分配完了,不需要帮忙了
*/
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//通过CAS更新sizeCtl成功,帮助扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
复制代码
通过比较resizeStamp()标记是否符合预期,然后通过CAS竞争更新sizeCtl以帮忙扩容。
在成功加入新的数据后,再通过addCount()更新当前数据容量
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
.......
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 当前容量s >= sizeCtl,需要处理扩容
// 扩容标记值
int rs = resizeStamp(n);
if (sc < 0) {
// 继续扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
/**
* 1.sc >>> RESIZE_STAMP_SHIFT) != rs 确定扩容标记是没有变化
* 2.sc == rs + 1 说明所有线程扩容完毕,不用去帮忙了
* 3.sc == rs + MAX_RESIZERS 说明扩容线程达到了上限
* 4.transferIndex <= 0 说明table所有区域分配完了,不需要帮忙了
*/
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 通过CAS更新sizeCtl成功,继续扩容
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 说明将首次进入扩容
transfer(tab, null);
// 计算当前容量
s = sumCount();
}
}
}
复制代码
这里就可以回答之前遗留的问题:扩容时,如何保证nextTab是同一个?
- 在插入数据后,重新计算当前容量,如果大于sizeCtl,则进行扩容。
- 首次进入transfer(),将对 sizeCtl = sizeCtl + 2 进行更新,并传入null,
- 后续再进入transfer(),则将 sizeCtl = sizeCtl + 1 进行更新,并传入代表nextTable的nt,在退出时 将 sizeCtl = sizeCtl – 1 进行更新
- 当检查到 sizeCtl – 2 = resizeStamp() 时,说明回归到了扩容标记值,扩容结束
- 因此,只有首次进入transfer()的线程,才会创建nextTable,从而保证其他线程扩容时访问的是同一个。
分段锁小结
与插入方法对应的删除方法replaceNode(),也是按照大体一直的逻辑进行处理。首先,发现正在扩容,则帮助扩容。然后,在获取table[i]上的锁,进行语义逻辑。由此table[i]上的数据能保证并发安全。ConcurrentHashMap()的竞争条件也因分段锁被分散了。
充分理解了 分段扩容、sizeCtl控制和分段锁之后,其他的方法语义实现,就变得简单了,因此不做展开。
Synchronized替换ReentrantLock
除了以上提及的ConcurrentHashMap对并发效率的优化处理外,将加锁实现方式从ReentrantLock变为JDK8后的Synchronized,也是一项大优化。
在之前的版本,ConcurrentHashMap的实现,是每个table[i]位置是一个Segment,Segment中装载了所有索引在此位置的数据。并且,每个Segment是一个ReentrantLock(独占锁),操作table[i]时,就要显示通过加解锁方法。ReentrantLock基于AQS实现,那么在并发时,获取不到锁的线程将被挂起,直到其他线程释放锁才被唤醒,进而继续尝试获取锁。那么,挂起与唤醒所引起的用户态切和内核态切换的消耗,上下文切换所带来的性能消耗是不可避免的。
而在JDK8中,将上锁方式从ReentrantLock换成了Synchronized,是基于Synchronized的性能得到了升级,支持了锁升级的膨胀过程。
简单来说,Synchronized将经历偏向锁/轻量级锁/重量级锁的过程。
- 首先,Java的每个对象都可以做为一把锁。
- 一开始,只有一个线程使用这把锁,此锁将被标记为偏向锁。此时,运行过程就如没有锁一样。
- 紧接着,更多的线程有了竞争条件,竞争使用这把锁,此锁将被标记为轻量级锁。此时,没有获得锁的线程将通过CAS自旋,浪费CPU时间,获取锁,一些时候,将很快获取得到锁。轻量级锁意味着,虽然发生了竞争条件,但大家都很快通过临界区,与其切换内核态,挂起/唤起线程,不如浪费CPU时间而消耗的性能更少。此过程,不需要到切换到内核态。
- 后来,竞争条件激烈了,每个线程通过轻量级锁浪费CPU时间的方式,迟迟得不到锁,性能消耗得太多了,不如把他们挂起。此时,此锁将被标记为重量级锁,每个获取不到锁的线程,也将经历挂起和恢复的过程,付出用户态切和内核态切换,以及上下文切换所带来的性能消耗的代价。
因此,当table[i]上为偏向锁和轻量级锁时,表现好于ReentrantLock;当table[i]上为重量级锁时,表现与ReentrantLock相差不多。总体来看,效率将好于ReentrantLock。
是否记得在分段扩容一节,table[i]上的Node去往nextTable时,创建了新的Node,而不是复用Node,将可能提高并发效率。
虽然Synchronized不支持锁降级,但我们可以使用新的锁,来重新观察竞争条件激烈程度,重新经历锁的膨胀过程。使用新的Node,就是使用了新的锁。原因在于,对table[i]的竞争激烈程度,不代表在nextTable[i]和nextTable[i + n]上也是相同的,很可能仅在位置之一,或者两位置的竞争都没那么激烈了。最坏情况,也因分段锁得到缓解。
并且Synchronized作为管程,有更多的想象空间。
对于Synchronized和ReentrantLock的更多了解,可参考:Java锁
当前容量计算
ConcurrentHashMap将可能在各个位置上并发,因此当前容量,更多是一个估值,毕竟,不能 stop the world。ConcurrentHashMap就以内部类CounterCell对象,对应记录每个线程对数据的增删次数,然后综合得出结果。相关方法见fullAddCount()、sumCount(),不做展开。
与ConcurrentHashMap相关的其他方法语义,也在了解上述内容后,变得简单,亦不展开。
总结
现在,来回答一开始提出的问题:ConcurrentHashMap为实现高并发做了哪些努力?
- 首先,遵循HashMap的特性,数据分布在不同的区域。那么,以分段锁进行并发,减少区域上的竞争条件。
- 然后,维持加载因子下的容量,使碰撞尽可能地少,避免竞争条件的线程挤到一块。而当容量不足时,并发地进行扩,方式为:transferIndex记录未被分配进行扩容的区域,通过CAS竞争更新transferIndex,各线程被分配并处理某个区域,完成table到nextTable的转移。如此,加快通过临界区。
- 为了观测某个区域的并发激烈程度,将锁的方式从ReentrantLock变为Synchronized,以Synchronized的锁膨胀特性,使得根据并发激烈程度的不同而动态的使用不同的锁方案。并在扩容时,对各区域重新观测。
- 为了更快地处理逻辑,table区域长度恒为2的幂,如此,可通过位操作更快地处理本身逻辑。
- 最后,以int sizeCtr 维护ConcurrentHashMap状态:初始状态、初始化状态、扩容状态、扩容状态。sizeCtr的运作方式,是理解ConcurrentHashMap的突破口。
- 而至于某个区域是链表还是红黑树,也是为了加快索引而做的转换,最终目的也是为了更快地通过临界区。
以上,错误之处,不吝赐教。