AbstractQueuedSynchronizer
通常简称AQS
,他是java.util.locks
包下的一个类,Java中的很多类都是通过改类实现的。
在本文中我们通过阅读源码的方式查看下其实现原理,使用的JDK版本为1.8.
1、类结构与数据结构
1.1 类继承关系
AbstractQueuedSynchronizer
继承AbstractOwnableSynchronizer
,AbstractOwnableSynchronizer
的定义是比较简单的,里面只有一个字段exclusiveOwnerThread
,该字段是用来存储当前占有同步器的的线程。
1.2 成员变量
AQS
类中的成员变量如下:
Node head
头节点Node tail
尾节点int state
状态值 通过对该字段的操作实现同步的逻辑
AQS
中有个ConditionObject
内部类,其成员变量如下
Node fitstWaiter
头节点Node lastWaiter
尾节点
1.3 数据结构
通过上面的成员变量,我们很容易想到这是一个链表结构,链表节点的定义如下:
static final class Node {
// 共享锁模式的nextWaiter
static final Node SHARED = new Node();
// 独占锁模式的nextWaiter
static final Node EXCLUSIVE = null;
// 取消
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
复制代码
通过节点的定义我们发现里面双向链表和单向链表同时存在,这两个链表分别有什么用我们会在后面的源码学习中一点点的讲解。其结构如下图所示:
节点的状态有如下几种,我们会在后面的源码查看中了解什么时候会设置成相应的值:
- CANCELLED = 1
- SIGNAL = -1
- CONDITION = -2
- PROPAGATE = -3
- 0
2 获取/释放锁
在上面的部门我们了解了AQS
的数据结构,在这部分中我们看下AQS
是如何处理获取锁和释放锁的逻辑的。
AQS
中存在独占和共享两种模式,对外提供的获取/释放的方法如下:
protected final boolean compareAndSetState(int expect, int update)
通过CAS的方式修改state
属性public final void acquire(int arg)
获取排他锁public final void acquireShared(int arg)
获取共享锁public final boolean tryAcquireNanos(int arg, long nanosTimeout)
带超时时间的获取排他锁public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
带超时时间的获取共享锁public final boolean release(int arg)
释放排他锁public final boolean releaseShared(int arg)
释放共享锁
2.1 compareAndSetState方法
该方法是线程安全的修改state
属性的值,其使用的是Unsafe
类的CAS操作,其源码如下
protected final boolean compareAndSetState(int expect, int update) {
// 使用Unsafe类进行CAS操作,保证线程安全
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
2.2 acquire方法
该方法是排他锁获取锁的方法,其源码如下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 获取锁失败且加入等待队列失败,执行Thread.interrupt()方法
selfInterrupt();
}
复制代码
在这个方法中会调用tryAcquire
方法、addWaiter
方法和acquireQueued
方法。
tryAcquire
方法是尝试获得锁,阅读tryAcquire
方法的源码我们会发现在AQS
中直接抛出了异常,很明显这里使用了方法模板模式,需要子类来实现该方法,其源码如下:
protected boolean tryAcquire(int arg) {
// 在AQS中直接抛出异常,留给子类实现 模板方法
throw new UnsupportedOperationException();
}
复制代码
如果获取锁失败,AQS
会将该线程添加到等待队列,addWaiter
方法源码如下
private Node addWaiter(Node mode) {
// 创建一个节点对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 队列非空
if (pred != null) {
// 添加到尾部
node.prev = pred;
// 使用CAS的方法修改尾节点
if (compareAndSetTail(pred, node)) {
// 修改当前节点未之前列表尾节点的下一个节点
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋添加
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 头节点为空 初始化链表
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将链表的尾节点设置为新节点的前驱节点
node.prev = t;
// 通过cas的方式设置尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取节点的前置节点
final Node p = node.predecessor();
// 如果前置节点是头节点,尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获得锁成功,将该节点设置为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置节点的状态
int ws = pred.waitStatus;
// 如果为-1
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 从队列中移除该节点所有状态为CANCELLED的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 修改前置节点状态为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 节点的线程设置为null
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
// 从队列中移除状态为-1的前置节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 节点的状态修改为-1
node.waitStatus = Node.CANCELLED;
// 如果是尾节点,移除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
复制代码
当不存在竞争时,线程过来就能获取到锁,此时的队列为空。
当一个线程过来,发现锁已经被其他线程占有,此时会初始化队列,并将自己插入队列中,队列结构如下
当在有其他线程过来,会在参与一次锁竞争,如果获得失败,会添加到队列的尾部。
2.3 tryAcquireNanos
该方法是指定了超时时间的获取锁方法,该方法和tryAcquire方法没有太大的区别,只是增加了对超时时间的判断,该方法的源码如下:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
// 如果线程被中断过 抛出异常
throw new InterruptedException();
// 尝试获取锁 获取锁失败 将线程添加到阻塞队列
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 插入到阻塞队列并获取插入的节点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 自旋
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
// 超时返回false
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
2.4 release方法
我们知道锁需要具备获取和释放的功能,上面的方法我们介绍了AQS中实现获取锁的逻辑,release是释放锁的逻辑,其源码如下
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
// 头节点
Node h = head;
// 头节点存在且状态不为0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
通过源码我们可以发现,tryRelease
方法同样需要子类来实现。
在获取锁那里也有unparkSuccessor
方法,其源码如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 修改头节点的status为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 后置节点为空或者状态为1 被取消的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点遍历等待队列,移除被取消的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
至此独占模式的锁获取和释放的逻辑已经介绍完了,共享模式相关的代码在本文中就不介绍了,大家可以自己去源码里查看下,逻辑相差不是很大。
3 ConditionObject
ConditionObject
是AQS
中的一个内部类,我们可以使用该类实现等待/唤醒的功能。类似wait()、notify()的功能,接下来我们看看在AQS
中是如何实现的。
ConditionObject
继承自Condition
,其类图如下:
这个类中提供了好多个等待/唤醒的方法,在本文中我们只介绍await()
和signal()
两个方法,其他的几种方法还是大家自己去看源码理解。
3.1 await
该方法用于使当前线程进入等待状态,其源码如下:
public final void await() throws InterruptedException {
// 判断当前线程是否被终止
if (Thread.interrupted())
throw new InterruptedException();
// 添加到等待队列
Node node = addConditionWaiter();
// 释放持有的锁 从阻塞队列中移除
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步队列
while (!isOnSyncQueue(node)) {
// 暂停线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
将线程加入等待队列中的方法源码如下:
private Node addConditionWaiter() {
// 单向链表的尾节点
Node t = lastWaiter;
// 如果状态不为-2 从等待队列中移除节点
if (t != null && t.waitStatus != Node.CONDITION) {
// 移除不为等待状态节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个节点 节点状态为-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 链表未初始化时设置为头节点
firstWaiter = node;
else
// 向链表尾部插入
t.nextWaiter = node;
// 设置为尾节点
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
// 单向链表的头节点
Node t = firstWaiter;
Node trail = null;
// 遍历链表移除状态不为-2的节点
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
复制代码
当线程插入等待队列后会调用fullyRelease
方法从阻塞队列中移除,其源码如下
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前状态值
int savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
复制代码
3.2 signal
该方法用于唤醒一个等待中的线程,其源码如下:
public final void signal() {
// 当前线程是否持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 替换节点状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 添加到同步队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒线程
LockSupport.unpark(node.thread);
return true;
}
复制代码
4 总结
通过上面的源码阅读,我们会发现,在AQS
中有两个链表,一个是双向链表的同步队列,一个是单向链表的等待队列。
链表节点的状态如下
- 0 默认值
- 1 取消 调用cancell方法时会设置成该值
- -2 等待 调用
Condition
的await
方法时会设置成该值 在等待队列中 - -1 调用signal方法或者acquireQueued方法时会设置成该值
- -3 这个值在我们的源码中没有看到,这个值在释放共享锁的方法中会用到,大家自己去看这个逻辑吧
AQS
的源码就到这里结束了,AQS
是Java中的同步工具的底层实现,在它里面封装了队列的操作,接入方只需要实现相应的方法即可,我们在后面的文章中会介绍其在其他其他同步工具中的使用。