最近在看LinkedBlockingQueue
源码的时候发现内部是采用ReentrantLock
进行线程同步的,众所周知 Java 中线程同步的方法除了最常用的synchronized
关键字和Object#wait
等方法,还有另一种方式那就是使用ReentrantLock
类。
ReentrantLock
本身只是实现了Lock
接口,而真正实现线程同步的关键是另外一个类AbstractQueuedSynchronizer
(AQS),AQS
是 Java 中一个线程同步框架,基于AQS
重写tryAcquire
和tryRelease
等方法就可以实现线程同步,因此本文就从ReetrantLock
为例子,解析AQS
的工作流程。
Tips
- 代码基于 Java 8
- 全文会用
AQS
指代AbstractQueuedSynchronizer
- 为了提升阅读流畅性,文中会在需要的地方将代码二次贴出,并加上
repeat
标识
源码梳理
加锁
ReentrantLock#lock
首先,从ReentrantLock#lock
方法入手。
public void lock() {
sync.lock();
}
复制代码
调用的是ReentrantLock$Sync#lock
方法,根据创建ReentrantLock
时的参数可为NonfairSync#lock
或FairSync#lock
,分别代表非公平锁和公平锁实现,它们之间的具体类图关系如下:
为了更好地理解,本文以ReentrantLock
默认构造函数所用的NonfairSync
为例,至于FairSync
公平锁在实现逻辑上基本相同。
NonfairSync#lock
final void lock() {
if (compareAndSetState(0, 1)) //CAS 尝试设置 state 为 1,0 表示没有线程持有锁
setExclusiveOwnerThread(Thread.currentThread()); //成功,设置当前线程为占有线程,lock 流程结束
else
acquire(1); //失败,执行 AQS#acquire 方法
}
复制代码
AQS#acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && //实际相当于调用 Sync#nonfairTryAcquire
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
首先调用tryAcquire
,注意此方法在AQS
中为空实现,具体实现在NonfairSync
中,实际内部调用Sync#nonfairTryAcquire
Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //获取 AQS 的 state
if (c == 0) { //等于0,表示没有线程持有锁
if (compareAndSetState(0, acquires)) { //CAS,成功表示获取锁,acquires 通常为 1
setExclusiveOwnerThread(current); //设置占有线程,lock 流程结束
return true;
}
}
// Mark A
else if (current == getExclusiveOwnerThread()) { //如果当前线程就是占有锁的线程
int nextc = c + acquires; //state += acquires
if (nextc < 0) //超过 Integer.MAX_VALUE,溢出为负数
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //acquire 失败
}
复制代码
从上面的Mark A
代码块可以看出ReentrantLock
的可重入性,每调用一次lock
意味着state += 1
,调用unlock
则state -= 1
,因此完全释放锁需要调用相同次数的unlock()
,当 state 为 0 时表明锁被释放。
如果tryAcquire
失败的话,紧接着调用addWaiter
方法,addWaiter
会生成一个AQS$Node
,这个Node
实际是对当前线程的一个封装。
AQS#addWaiter
//repeat start
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //acquire 失败,先调用 addWaiter
selfInterrupt();
}
//repeat end
//AQS#addWaiter
private Node addWaiter(Node mode) { //mode = Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode); //将线程封装成 Node
Node pred = tail; //获取尾节点
if (pred != null) { //尾节点不为空,表明队列不为空
node.prev = pred;
// Mark A
if (compareAndSetTail(pred, node)) { //CAS 尝试将当前 Node 替换成尾节点
pred.next = node; //成功替换为尾节点,将上一个尾节点的 next 指向当前 Node
return node; //返回 Node
}
}
enq(node); //失败,说明有其他线程替换了尾节点,调用 enq
return node;
}
//AQS#enq
private Node enq(final Node node) {
for (;;) { //开始循环
Node t = tail;
if (t == null) { //尾节点为空 = 队列为空,需要初始化一个节点,该节点不代表任何线程,是一个 dummy 节点
if (compareAndSetHead(new Node()))
tail = head;
} else { //同上 Mark A 处代码相同,将当前 Node 设置到尾节点,失败则进入下一个循环,直至成功
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
通过上面的代码可以了解到,AbstractQueuedSynchronizer
的Queued
指的是在该类中维护了一个Node
的双向链表结构的 FIFO 队列,这个队列中如果有 n 个线程,那么就会有 n + 1 个 Node,因为头节点是个虚Node
。
addWaiter
之后会将当前Node
返回给acquireQueued
,这同时意味着我们的线程已经在队列尾部了。
AQS#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; //中断标记
for (;;) { //开始循环
final Node p = node.predecessor(); //获取前一个 Node
//判断前一个节点是不是 head,是的话 CAS 尝试获取锁
//head 是 dummy Node,等于说我们是队列里第一个等待执行的线程
if (p == head && tryAcquire(arg)) {
//成功获取锁,将当前 Node 设置为 head
//另外 setHead 中 Node.thread 会被设置为 null,Node 变成新的 dummy head
setHead(node);
p.next = null; //释放前节点的引用,帮助 GC
failed = false;
return interrupted; // == false
}
if (shouldParkAfterFailedAcquire(p, node) && //tryAcquire失败,检查是否需要 park 线程(Go A ↓↓↓)
parkAndCheckInterrupt()) //执行 park,并检查中断 (Go B ↓↓↓)
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// A
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前一个节点已经 SIGNAL 了,意味着当前节点可以直接 park
// SIGNAL = -1, CONDITION = -2, PROPAGATE = -3, CANCELLED = 1
return true;
if (ws > 0) { //前一个节点已经取消
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //循环向前寻找节点直到不为 CANCELLED 的节点
pred.next = node; //弃置中间的 CANCELLED 节点
} else {
//前一个节点要么为 0 要么为 PROPAGATE,CAS 尝试将其设置为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// B
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //执行到这里表明线程阻塞,停止自旋获取锁,等待 unpark
return Thread.interrupted();//unpark 恢复执行后检查是否有中断
}
复制代码
到LockSupport#park
为止,整个ReentrantLock#lock
方法的流程梳理完毕。
小结
Reentrant#lock
实际就是执行NonfairSync#lock
,在这一步线程会直接尝试 CAS 争抢锁,失败的话才进行第二步。- 这一步执行
AQS#acquire
实际执行Sync#nonfairTryAcquire
,和第一步一样先尝试 CAS 争抢锁,同时这一步最关键的是处理重入锁,等于说所有重入的线程在第一步都会失败,然后在这里成功。 AQS#addWaiter
的主要工作就是把线程封装成Node
节点,并插入到队列的尾端。- 在
Node
入列后,这一步的工作就是等待出列,先循环判断Node
是不是队列里第一个等待执行的线程,是的话尝试 CAS 争抢锁,这个行为称为锁的自旋;锁自旋一定次数后,判断条件为Node
的前一个节点的状态如果变成SIGNAL
,那么停止自旋,线程park
阻塞,等待唤醒。
从总结中可以看出,每个线程在入列之前,都会在 1、2 步先尝试 CAS 争抢锁,这里体现了非公平锁的特点,不重点关注加锁的先后顺序,提升了吞吐量。
一点引申
- 关于
acquireQueued
的final
方法块什么时候在会执行以及CANCELLED
节点,因为这篇文章主要分析的是ReentrantLock#lock
方法,在这个前提下经过测试后,我认为在单纯调用ReentrantLock#lock
的时候,应该只有在发生未知异常的时候会触发会执行cancelAcquire
,因为在线程park
以后,调用Thread#interrupt
方法也只是让线程恢复执行,并不会直接抛出异常,Node
最终还是会在队列中继续去获取锁,只是返回interrupted
为true
。所以我觉得在使用ReentrantLock#lock
的时候,队列中的Node
的waitStatus
应该不会出现CANCELLED
,但是如果调用的是ReentrantLock#lockInterruptibly
,在线程interrupt
后是会直接抛出异常的,,final
中就会生成CANCELLED
节点,感兴趣可以看AQS#doAcquireInterruptibly
方法,如果我的理解有误的话欢迎指出。
释放锁
ReentrantLock#unlock
public void unlock() {
sync.release(1); //调用 AQS#release
}
//AQS#release
public final boolean release(int arg) {
//调用 Sync#tryRelease,NonfairSync 和 FairSync都没有重写此方法
//true 表明锁被释放 (Go A ↓↓↓)
if (tryRelease(arg)) {
Node h = head; //头节点,一开始初始化时为 dummy 节点
//1. 如果 head 为空说明队列没有初始化过,队列中没有等待的 Node
//2. 如果 head 的 state 为 0,说明还没有被后驱的节点设置成 SIGNAL
//那么后面的节点线程还处于自旋状态,尚未 park,不需要 unpark
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//A
//Sync#tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // state - 1
if (Thread.currentThread() != getExclusiveOwnerThread()) //如果调用的线程并没有持有锁,抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果 state 为 0,表明锁被释放,不为 0 只是减少了一次重入
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
复制代码
unlock
中所做的工作就是减少AQS
的state
,state
为 0 则说明锁已释放,如果此时队列中没有线程或第一个线程没有park
的话,那么unlock
的工作也就结束了,但是如果有park
的线程,那么还需要负责唤醒被park
的线程。
AQS#unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // ws 不为 CANCELLED
compareAndSetWaitStatus(node, ws, 0); //将 waitStatus 设为 0
Node s = node.next; //如果 node 是 head 的话,s 也是第一个真实节点
if (s == null || s.waitStatus > 0) {
s = null;
//s 如果为 null 或已经 CANCELLED,从队列尾部开始遍历
//尝试寻找 s 节点之后第一个不为 CANCELLED 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //唤醒该线程
}
复制代码
为什么这里要从尾部开始寻找Node
?
从 addWaiter Mark A
处可以看到做的事情是:
- 为新
Node
设置前驱节点 - 将新
Node
CAS 设置为tail
- 将原来的尾
Node#next
指向新Node
第二、第三步并不是原子操作,如果在这两步之间执行了unparkSuccessor
,从头开始遍历的话,不能遍历到最新入列的Node
,而从尾部遍历就没有这个问题。另外在产生CANCELLED
节点的时候,先断开的是next
指针,prev
指针并未断开,从尾部开始能保证遍历全所有Node
。
小结
释放锁的的内容比较简单,就是调用AQS#release
,Sync
类都没有重写这个方法,每次调用AQS
的state
都会值减 1 ,当state
变为 0 时锁也就释放了,然后判断一下需不需要唤醒队列里的第一个节点。
其他
在一点引申这里提过关于CANCELLED
节点,对这个我们做进一步的分析。
AQS#cancelAcquire
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;//移除 node 的线程
Node pred = node.prev; // pred = 前驱节点
//如果 pred.ws == CANCELLED,循环向前遍历,直到找到不为 CANCELLED 的 Node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next; //pred 原来的 next Node (可能 == node)
node.waitStatus = Node.CANCELLED;//将 node.ws 设置为 CANCELLED
//如果当前 node 是尾节点则 CAS 尝试将 pred 设置为 tail
if (node == tail && compareAndSetTail(node, pred)) {
//CAS 尝试将 pred.next 设置为 null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//多重判断,每一项需都满足
//1. pred 不是头节点
//2. (pred.ws == SIGNAL) 或 (ws 不为 CANCELLED 且可以成功被设置成 SINGNAL)
//3. pred.thread 不能为空
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
//如果 node.next 不为空且不为 CANCELLED,将 node.next 与 pred 连接
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//唤醒 node 之后第一个不为 CANCELLED 的节点
unparkSuccessor(node);
}
//将 node.next 指向自身
node.next = node;
}
}
复制代码
这段代码的主要目的就是,在Node
变为CANCCELLED
的时候,处理其前面的节点的引用,注意只是对Node.next
进行处理,Node.prev
没有进行操作,简单的说就是在从当前节点开始,向前判断各个节点,直到遇到不为CANCELLED
的节点,将该节点的next
指向当前节点的next
。用图表示如下:
当然这是最常见的一种情况,代码里还有对各种特殊情况的判断,比如找到的前驱节点是不是头节点、当前节点是不是尾节点等,但是都是为了处理next
引用。此时这些CANCELLED
节点还在队列里,因为他们的引用没有完全断开,还有prev
引用连接着。
那为什么不在这里处理prev
引用?
因为节点在自旋的过程中AQS#shouldParkAfterFailedAcquire
会对自身的prev
进行更新,移除掉前面的无效节点,但这个操作不是原子性的,如果在这里修改prev
节点,那么指向的节点有可能因为之后也CANCELLED
了并且在shouldParkAfterFailedAcquire
被移除,那就会变成指向到一个被移除的节点。
//AQS#shouldParkAfterFailedAcquire
//repeat start
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//repeat end
复制代码
总结
至此,从ReentrantLock
切入进行分析AQS
的加锁和释放锁部分也就分析完了,总体框架就是将线程封装成节点,然后利用 FIFO 队列实现同步,再加上一些性能如优化减少线程过多自旋等。剩下的AQS$ConditionObject
会在后续的文章里进行分析。