1. 基本概念
Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronized
(AQS)实现的。AQS使用了一个int类型的成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
2. 实现原理
2.1 数据结构分析
AQS的核心思想:如果被请求的资源空闲,则将当前线程设置为有效的工作线程,将共享资源的状态设置为锁定状态;如果共享资源被占用,就需要一定的等待唤醒机制来保证分配。这个机制主要使用CLH队列的变体来实现,将暂时用不到的线程加入到队列中。
2.1.1 同步队列
ASQ中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
同步队列的基本结构如下:
- 同步队列的入队操作
同步队列的入队操作必须是原子操作,因此使用了一个基于CAS的方法:compareAndSetTail()方法。
- 同步队列的出队操作
设置首节点是通过获取同步状态的成功的线程来实现的,而只有一个线程能够获取到同步状态,因此设置头结点的方法不需要使用CAS来保证。
2.1.2 Node结构
AQS中的每个节点都是一个Node,Node是CLH队列上的节点,包含了当前线程和等待状态等信息。
- Node
方法和属性值 | 描述 |
---|---|
waitStatus | 等待状态 |
thread | 获取同步状态的线程 |
pre | 前驱节点 |
predecessor | 返回前驱节点,没有的话跑出npe |
nextWaiter | 指向下一个处于CONDITION状态的节点 |
next | 后继节点 |
- waitStatus
名称 | 含义 |
---|---|
0 | 初始状态 |
CANCELLED | 1, 当期线程获取锁的请求已经取消了 |
CONDITION | -2, 表示节点在在等待队列中,节点线程请求被唤醒 |
PROGATE | -3, 当前线程处在SHARED情况下,该字段才会使用 |
SIGNAL | -1, 表示当前线程已经准备好了,就等资源释放 |
- 线程两种锁的模式
模式 | 含义 |
---|---|
SHARED | 表示当前线程以共享的模式等待锁 |
EXCLUSIVE | 表示当前线程正在以独占的方式等待锁 |
2.2 同步状态分析
在获取锁的时候分为独占式和共享式,独占式是只要一个线程能够获取到锁,如ReentrantLock,而共享式是可以多个线程线程同时获取到该锁,如ReadWriteLock。
2.2.1 独占模式
2.2.1.1 加锁
- aquire()方法
aquire()方法以独占式获取资源,如果获取到资源,直接返回,如果没有获取到资源,则进入等待队列,开始自旋操作,直到获取到锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
- addWaiter()方法
获取锁失败后会执行addWaiter()方法,将node加入CLH队列中(CAS)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
- acuqireQueued()方法
进入到LCH队列中的线程以自旋的方式获取资源,获取成功后将进行出队操作。
final boolean acquireQueued(final Node node, int arg) {
//用来标记是否拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断
boolean interrupted = false;
//开始获取锁,要么获取成功,要么中断
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点是head,当前线程尝试获取锁
if (p == head && tryAcquire(arg)) {
//将当前节点设置为虚节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//p为头节点没有获取到锁或者p不是头节点,判断是否需要阻塞当前node(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //调用LockSupport.park()
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
- setHead() && shouldParkAfterFailedAcuqire()
setHead() 将当前节点设置为虚节点。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 通过前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取头节点的状态
int ws = pred.waitStatus;
//头节点处于唤醒状态
if (ws == Node.SIGNAL)
return true;
//头节点处于取消状态
if (ws > 0) {
do {
//循环向前查找节点,如果节点被取消则出队列
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前驱节点等待状态为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
·
复制代码
上面是独占锁获取资源的方式,接下来说明独占模式释放资源的方式
2.2.1.2 解锁
解锁操作实际上就是将当前线程的状态置0,唤醒下一个线程
- release()
public final boolean release(int arg) {
//自定义的tryRease()如果返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
//获取头节点
Node h = head;
//头节点为空且头节点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
- unparkSuccessor()
如果node的后继节点不为空且不是取消状态,则唤醒这个后继节点;否则从末尾开始寻找合适的节点,如果找到就唤醒。
private void unparkSuccessor(Node node) {
//获取头节点waitStuatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的下一个节点
Node s = node.next;
//如果如果下一个节点为空,或者下一个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
//从队列尾部开始查找,找到第一个waitStuatus<0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果当前节点的下一个节点不为空,而且状态<=0,就把当前节点unpark()
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
从前往后找的原因:
- 断开节点是先断开的next指针
- 节点入队操作是纷纷为两步node.prev = pred; compareAndSetTail(pred, node),并不是原子操作,如果执行了node.prev = pred后执行了unparkSuccessor方法,就没法从前往后找了。
2.2.2 共享模式
共享模式,允许多个线程同时获取到同步状态,整个过程也是不响应中断的。
- acuqiredShared()方法
共享模式下获取锁,获取成功就直接返回,否则加入等待队列中。
- 首先通过自定义的tryAcquireShared()方法尝试获取资源
- 获取失败执行doAcquireShared()方法,
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
- doAcquireShared()
同独占模式一样,将当前线程放入队列尾部,自旋直到获取到锁
private void doAcquireShared(int arg) {
//以共享模式加入到队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//前驱节点为空,当前线程尝试获取资源
if (p == head) {
int r = tryAcquireShared(arg);
//获取成功
if (r >= 0) {
//将head指向自己,还有有剩余资源可以唤醒自己
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt(); //被中断过,此时响应中断(也就是说并不是立即响应中断操作)
failed = false;
return;
}
}
//获取失败,将当前线程加入等待队列中,进入waiting状态等待被唤醒或打断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
- releaseShared()
共享模式下释放指定数量的资源,并唤醒后其他节点。
在独占模式下state=0时,才会唤醒其他线程,而共享模式下会直接唤醒其他线程。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
复制代码
- doRealseShared()
用于唤醒其他节点,当state>0,获取剩余公共享资源;当state=0,获取共享资源
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
参考文章: