AbstractQueuedSynchronizer(AQS)源码
上一篇我们对ReentrantLock到AQS源码有了一个大致的了解!并且讲解了公平锁与非公平锁的区别以及重入锁的实现原理,这一篇主要就是针对AQS的源码包括独占锁,共享锁,释放锁等的源码以及实现原理就行总结;
AQS的整体结构
作为JUC包下的基石,AQS的重要性不言而喻,上一篇文章中也说明了AQS的重要性,这里我们就长话短说;短话不说,下面正式开始我们源码之旅。
在上一篇文章中我们说到公平锁在获取锁的时候有一个方法hasQueuedPredecessors()
这个方法的目的是判断当前线程是否需要排队;那涉及到排队肯定离开队列;那这个队列长什么样子?那这个队列是单向队列还是双向队列?我们抱着这些疑问向下看:
AQS中的成员变量:
/**
* 头节点
*/
private transient volatile Node head;
/**
* 尾节点
*/
private transient volatile Node tail;
/**
* 同步状态 0:自由状态,大于0:持有锁
*/
private volatile int state;
//获取state状态
protected final int getState() {
return state;
}
//设置state状态
protected final void setState(int newState) {
state = newState;
}
复制代码
AQS中的成员变量只有3个,同步队列的头节点,同步队列的尾节点以及同步状态,注意这3个成员变量都是由volatile
关键字修饰的!保证了多个线程对它的修改都是线程可见的!
我们大致的想象一些同步队列的实现,每一个Node都可以看做一个线程,如下:
下面是队列中每个节点的结构:
static final class Node {
/** 共享节点的标识(共享锁的标识) */
static final Node SHARED = new Node();
/** 独占节点的标识(独占锁的标识) */
static final Node EXCLUSIVE = null;
/** waitStatus的值,指示线程已取消 */
static final int CANCELLED = 1;
/** waitStatus的值:-1 表示后继节点处于等待的状态,-1这个值是后继节点帮当前节点设置的,意思是当前节点如果释放了锁之后同时要把它的后继节点叫醒;对于后继节点来说只有它上一个节点的状态是-1时我才能安心的睡眠等待,当它前面一个节点释放锁的时候要把我叫醒,然后让我去尝试获取锁*/
static final int SIGNAL = -1;
/** waitStatus的值: 表示当前结点在条件队列中排队,在等待某个条件 */
static final int CONDITION = -2;
/**
* 在共享模式下,无条件传播releaseShared状态,早期的JDK版本中并没有这个概念,引入这个状态是为了解决共享锁并发释放引起线程挂起的bug6801020(都是在不断的完善,在完善的过程中代码也越来越难懂)
*/
static final int PROPAGATE = -3;
//取值为上面的1、-1、-2、-3 或者 0
volatile int waitStatus;
//上一个节点
volatile Node prev;
//下一个节点
volatile Node next;
//当前线程
volatile Thread thread;
//连接到等待条件的下一个节点
Node nextWaiter;
复制代码
Node属于AbstractQueuedSynchronizer(AQS)的内部类。Node有很多属性例如前继节点,后继节点,当前线程,以及每个线程(节点)的状态信息。(看过数据结构中队列的实现方式应该对这个不陌生,这个结构其实就是数据结构中的链表,只是多了几个变量)。
看到这里我们就可以对上面的图进行完善一下:
上面我们用图大致的完善了一下同步队列中细节;AQS中的成员变量head指向同步队列中的头节点(哨兵节点),成员变量tail指向同步队列的尾节点;每一个Node中维护了当前线程Thread
,pre
指向上一个节点Node,next
指向下一个Node节点,waitStatus
表示当前节点的状态信息,重点关注一下-1
这个状态值;
上面我们说了Node是AbstractQueuedSynchronizer(AQS)中的一个内部类,下面我们再用类图来表示一下AbstractQueuedSynchronizer(AQS)和Node之间的关系:
我们可以简单的画成这个样子,AQS里面其实还维护了一个ConditionObject对象;这个是条件变量等待队列,这篇文章我们不涉及就没有画出来!
独占模式锁的获取
到这里AQS的大致结构就介绍到这里!下面我们重点讲解一下独占锁的获取共享锁的获取以及锁的释放。
我们一般使用锁的时候代码如下:
private Lock lock = new ReentrantLock();
public void method() {
try {
lock.lock(); //获得锁
System.out.println("执行一些操作。。。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//释放锁
}
}
复制代码
在lock方法中会调用AQS中的acquire()
方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
tryAcquire(arg)
方法主要就是去尝试获取锁,这个方法的细节都是由子类去实现;这里可以结合我的上一篇ReentrantLock到AQS源码文章中会讲解ReentrantLock的具体实现细节!这里我们先知道这个方法如果返回true就是表示获取锁成功,返回false表示获取锁失败!下面分为两种情况:
tryAcquire(arg)
方法返回true,而!tryAcquire(arg)
就是false,这个方法就会直接返回表示获取表成功!因为第一个条件是false,所以acquireQueued()
方法addWaiter()
方法就不会执行!- 当
tryAcquire(arg)
方法返回false,也就是获取锁失败了,相反!tryAcquire(arg)
方法就是true,代码继续向下执行,首先执行addWaiter(Node.EXCLUSIVE), arg)
方法,等待addWaiter(Node.EXCLUSIVE), arg)
方法返回结果继续执行acquireQueued()
方法!
下面我们我们假设获取锁失败之后执addWaiter(Node.EXCLUSIVE), arg)
中做了什么事?addWaiter(Node.EXCLUSIVE), arg)
方法中的参数Node.EXCLUSIVE
表示独占锁的意思;
private Node addWaiter(Node mode) {
//1 当前线程封装成node节点
Node node = new Node(Thread.currentThread(), mode);
//2 将尾节点赋值到变量perd
Node pred = tail;
//3 判断tail是否为空,其实这块代码用文字不是很好讲解,下面我们用图形的方式
if (pred != null) {
//4 将pred也就是当前队列最后一个节点赋值给node节点的上一个节点
node.prev = pred;
//5 将node节点设置为tail(最后一个节点)
if (compareAndSetTail(pred, node)) {
//6 并把上一个节点的next指针指向自己(node)
pred.next = node;
//7 将node返回出去
return node;
}
}
//8 我们大致可以猜到这的方法里有两个功能点:
// 8.1:当队列为空时,初始化队列
// 8.2: if (compareAndSetTail(pred, node))为false时将node节点设置为尾节点
enq(node);
return node;
}
复制代码
这里我们着重说一下3处的代码,判断tail是否为空,其实就是判断队尾是否有节点,如果tail等于null说明队列还没有进行初始化,就会执行enq()
方法;执行enq()
方法进行初始化队列我们后面会说到,这里我们只考虑队列中有元素的时候:
那当执行完4,5,6这三行代码的时候,队列就变成了下面这个样子:
到这里相信你以及对1,2,3,4,5,6,7,这几行代码没有疑问了吧,如果你还有疑问说明我这篇文章写的太失败了;
当tail真的为空的时候说明队列还没有初始化,那么就会执行enq()
方法进行初始化队列!初始化队列只是enq()
方法承担的任务之一;enq()
方法还有一个任务就是当if (compareAndSetTail(pred, node))
为false时将node节点设置为尾节点;也就是上面代码5处CAS失败了之后继续将node节点设置为尾节点;我们了解到的CAS失败之后会进行自旋;但是目前我们没有看到自旋,只是执行了enq()
方法,那难道自旋在enq()
方法中;那我们进去看看和我们猜的对不对;
private Node enq(final Node node) {
for (;;) {
//1.获取同步队列的尾节点
Node t = tail;
//2.如果尾节点为空说明队列还没有初始化
if (t == null) {
//3.进行初始化, 构造头结点
if (compareAndSetHead(new Node()))
//4.将尾节点tail也指向刚构造好的head
tail = head;
} else {
// 5 将node节点的prev指针指向队列的尾节点
node.prev = t;
// 6 将node节点设置为尾节点
if (compareAndSetTail(t, node)) {
// 7 将之前的尾节点的next指针指向node节点(也就是现在的尾节点)
t.next = node;
return t;
}
}
}
}
复制代码
果然 不出我们所料在enq()
方法中首先就进行了for(;;)
死循环;首先获取同步队列中的尾节点,如果尾节点为空说明队列还没有进行初始化,那么就会执行3处的代码进行初始化操作;当执行完1,2,3,4处的代码之后节点的状态是这样的:
这里我们注意到compareAndSetHead(new Node())
进行CAS设置head节点的时候没有用到参数中传入的node节点而是新new Node();
当作头节点!这也是队列设计的巧妙之处,借助哨兵节点,借助哨兵节点和不借助哨兵节点相比,借助哨兵节点会在入队和出队的操作中获得更大的便捷性,这样我们就不会关心它是不是头节点,不会单独为头节点做一下额外的判断操作,因此同步队列利用哨兵节点的作为链式存储结构!
注意到这里执行完1,2,3,4步的代码之后并没有退出循环,而是继续循环,这时候头节点已经构造完成,继续循环的目的就是在头节点后面构造node节点;那执行完5,6,7步的代码之后节点变成下面这个样子:
到这里我们假设的是队列没有初始化的场景,我们上面还说到一种场景就是当if (compareAndSetTail(pred, node))
为false时,这个时候会直接执行5,6,7处的代码逻辑!这3段代码的逻辑和上面addWaiter()
方法中4,5,6处的逻辑是一样的,这里就不做过多的阐述!我们总结一下enq()
方法的作用:
- 当
if (compareAndSetTail(pred, node))
为false时,CAS(自旋)不断尝试将node节点插入到尾节点直至成功为止; - 调用compareAndSetHead(new Node())方法,完成链式同步队列头结点的初始化
到这里我们还没有涉及到waitStatus
这个状态值,可以说到这里node节点只是入队目前还没有真正的将自己挂起;它还有一次尝试获取锁的机会!下面会说到;
我们看到enq()
方法是有返回值的但是在这里并没有用到这个返回值!addWaiter()
方法是将当前node节点返回了出去;之后就紧接着执行了acquireQueued()
方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1 获取给定节点的前继节点
final Node p = node.predecessor();
//2 注释太多,放下面了,见下面 对2处的代码进行解释
if (p == head && tryAcquire(arg)) {
//3
//3.1获取锁成功,将node(当前节点)设置为头节点;head = node;
//3.2将当前线程置为空; node.thread = null;
//3.3将当前节点的前继节点设置为空; node.prev = null;
setHead(node);
//4,为了帮助垃圾收集, 将上一个head结点的后继清空
p.next = null;
//设置获取成功状态
failed = false;
//返回中断的状态, 整个循环执行到这里才是出口
return interrupted;
}
//5 判断是否真的需要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
对2处的代码进行解释:
if (p == head && tryAcquire(arg))
判断前继节点是不是头节点,如果前继节点是头节点,那么当前节点就是队列中第一个节点(不算头节点,为啥不算头结点呢?头结点目前是持有锁的节点,正在执行业务逻辑,但是不知道什么时候结束;例如你去排队买火车票,你前面就有一个人,而这个人正在和业务员小姐姐交谈买票中ing..,那你是不是第一个排队买票的人;这个时候你可能在低头玩手机,那你是不是要时常抬抬头看看你前面那个人买好了没);所以说如果当前结点是同步队列的第一个结点, 就尝试去获取锁;
在刚进入这个方法的时候节点的状态可能是这样的:
T1节点的上一个节点是head节点,T1节点就是第一排队的节点;然后T1节点就执行tryAcquire(arg)
方法去尝试获取锁;假如获取锁成功;执行完2,3,4步代码之后:
这种状态算是理想状态,当前节点的前继节点是头节点并且尝试获取锁成功;但是如果当前节点的前继节点不是头节点又或者尝试获取锁失败;这个时候就会执行5处的代码,首先看一下:shouldParkAfterFailedAcquire()
方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//1 获取前继节点的waitStatus状态值
int ws = pred.waitStatus;
//2 判断waitStatus状态值是不是-1,如果前继节点的状态值是SIGNAL(-1)表明前继节点会唤醒当前节点,当前节点就可以安心的挂起了
if (ws == Node.SIGNAL)
//3 如果前驱节点的状态已经是SIGNAL,就直接返回true,接下来就会直接去执行parkAndCheckInterrupt()将线程挂起,因为当前节点的前继节点还没有释放锁
return true;
//4 如果waitStatus>0就有1这个值了,1:取消获取锁
if (ws > 0) {
//5 将队列中的waitStatus状态值>0(已取消状态)的前继节点清除掉
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//6,到这里 waitStataus的状态值有 0,-2,-3这些选择,上面我们说到 -2是条件队列中的状态值这里不涉及,-3是共享状态下的状态值这里是独占模式所以也不涉及;所以只有0这个状态了;也就是无状态(同样0也是默认值);然后将前继节点设置为-1,这样当前节点才会安心挂起;
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
在这里就涉及到waitStatus
这个状态值,这里首先拿到当前节点的前继节点的waitStatus
的状态值,然后判断是不是-1
,在上面我们对waitStatus
这个值进行了讲解,我们这里再着重回顾一下SIGNAL
这个值,SIGNAL
这个状态不是表示当前节点的状态而是表示当前节点前继节点的状态,当一个节点的状态被设置为SIGNAL
的时候,就说明它的下一个节点已经被挂起了(或者马上就要挂起了);如果某个节点释放了锁或者放弃获取锁的时候,如果它的状态是SIGNAL
,那它还要完成一个额外的操作—唤醒它的后继节点;
6处的代码是将前继节点的waitStataus
的状态值设计为-1
,如果CAS失败将返回false,然后到acquireQueued()
方法中继续死循环的设置直至成功;
当shouldParkAfterFailedAcquire
方法返回true时就紧接着执行parkAndCheckInterrupt()
方法:
private final boolean parkAndCheckInterrupt() {
//挂起当前线程
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
LockSupport
方法是一个工具类,真正提供park()
方法的是sun.misc.Unsafe
方法;
这里我们总结一下acquireQueued()
方法做的事情:
- 如果当前节点的前驱节点是头节点,并且尝试获取锁成功,该方法执行结束退出;意思就是在给当前线程一次获取锁的机会
- 如果上面的条件不能满足并且尝试获取锁失败就是执行挂起操作;首先先将前驱节点的waitStatus的状态设置为-1,然后执行park()方法将当前线程挂起
独占模式锁释放锁
一般我们使用ReentrantLock
的时候释放锁调用lock.unlock
方法,跟踪这个方法最后调用的是release()
方法:
public final boolean release(int arg) {
//释放锁,具体由子类实现
if (tryRelease(arg)) {
//获取head节点
Node h = head;
//如果头节点不等于空并且waitStatus不等于0;一般情况都是-1
if (h != null && h.waitStatus != 0)
//这里就是上面说的要去唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
private void unparkSuccessor(Node node) {
//获取头节点的waitStatus的状态值
int ws = node.waitStatus;
//如果小于0将头结点的状态更新为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取头结点下一个节点
Node s = node.next;
//如果后继结点为空或者等待状态为取消状态
if (s == null || s.waitStatus > 0) {
s = null;
//从后向前遍历队列找到第一个不是有效的结点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒给定结点后面首个不是取消状态的结点
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
释放锁的代码比较简单!到这里独占锁的获取以及释放锁就到这里结束了!
小结
获取锁
释放锁
到这里比较难啃的骨头终于啃完了,如果你看到了这里,其实也没有觉得特别难是吧,如果你这么觉得,说明我这篇文章写的还不错是吧,那就点个赞呗哈哈。。。今天又熬夜了。。。写文章不易;对于源码很多不是看不懂更多的是畏惧;只要多下点功夫利用好碎片化的时间;只要努力就能做到的事情就是最简单的事情!AbstractQueuedSynchronizer(AQS)整个类你看着很长,去掉作者的注释,然后再去掉重复的代码其实剩下的真的不多了!相比Spring的源码JUC包下的源码会容易很多;只是Spring的源码关联性很强;大家也可以去看看我的Spring源码系列;后面还会继续更新Spring源码相关内容;日程在并发编程之后;
共享模式锁的获取
共享模式其实和独占模式大同小异,同独占锁的重复代码很多,所以没有打算另写一篇文章去讲,下面我们就简单看一下共享模式;当我们使用ReentrantReadWriteLock
读写分离锁的时候我们调用ReadLock()
或者调用WirteLock()
方法的时候,它的内部都是在调用:
public final void acquireShared(int arg) {
//1 尝试获取锁,但是同独占锁返回值不同;
if (tryAcquireShared(arg) < 0)
//2 如果获取锁失败就执行这个方法
doAcquireShared(arg);
}
复制代码
我们看到if
语句中tryAcquireShared(arg)
的返回值同独占锁不同,这里AQS规定:
- 负值:表示获取锁失败
- 零值:表示当前节点获取成功,但是后继节点不能再获取了
- 正值:表示当前节点获取成功,并且后继节点同样可以获取
子类在实现tryAcquireShared
方法获取锁的逻辑时,返回值需要遵守这个约定;
当获取锁失败就执行doAcquireShared()
方法:
private void doAcquireShared(int arg) {
//1,这里的addWaiter方法和独占锁的方法一样的;这里就不多说了
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1 得到前驱节点
final Node p = node.predecessor();
//2 判断是不是头节点
if (p == head) {
//3 如果是头结点再次尝试去获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//4 表示获取锁成功,将锁的状态信息传播给后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
//5 如果在 线程阻塞期间收到中断请求, 就在这一步响应该请求
//关于中断会在后面文章中单独说到;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//6 获取锁失败判断自己是否需要挂起,如果需要挂起就会在parkAndCheckInterrupt方法中挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
进入doAcquireShared方法首先是调用addWaiter方法将当前线程包装成结点放到同步队列尾部。这个添加结点的过程我们在讲独占模式时讲过,这里就不再讲了。
结点进入同步队列后,如果它发现在它前面的结点就是head结点,因为head结点的线程已经获取锁正在处理过程中,那么下一个获取锁的结点就轮到自己了,所以当前结点先不会将自己挂起,而是再一次去尝试获取锁,如果前面那人刚好释放了锁,那么当前结点就能成功获得锁,如果前面那人还没有释放锁,那么就会调用shouldParkAfterFailedAcquire方法,在这个方法里面会将head结点的状态改为SIGNAL,只有保证前面结点的状态为SIGNAL,当前结点才能放心的将自己挂起,所有线程都会在parkAndCheckInterrupt方法里面被挂起。这些也是和独占锁是一样的逻辑!
如果当前节点成功获取了锁,那么就会调用setHeadAndPropagate();
方法:
private void setHeadAndPropagate(Node node, int propagate) {
//记录一下旧的头节点
Node h = head;
//将当前节点设置为头节点
setHead(node);
//propagate大于0表示锁是获取成功的,然后再检查一下旧头的状态信息
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//获取给定节点的后继节点
Node s = node.next;
//如果当前节点的状态为空或者是共享状态
if (s == null || s.isShared())
//唤醒后继节点
doReleaseShared();
}
}
复制代码
这个方法的主要目的是为了唤醒后继节点,这个方法是在独占锁模式下是没有的;在独占模式这里只是将自己设置为头节点等操作;
private void doReleaseShared() {
for (;;) {
//获取头结点
Node h = head;
if (h != null && h != tail) {
//获取head结点的等待状态
int ws = h.waitStatus;
//如果head结点的状态为SIGNAL, 表明后面有人在排队
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
//直到CAS成功
continue;
//再去唤醒后继结点;在独占锁的时候有说明,这里就不多说了;
unparkSuccessor(h);
}
//如果head结点的状态为0, 表明此时后面没人在排队, 就只是将head状态修改为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//直到CAS成功
continue;
}
if (h == head)
break;
}
}
复制代码
这个时候头结点已经在setHeadAndPropagate()
方法改变了,改变成了当前节点!在调用unparkSuccessor(h)
方法必须要满足的条件是:头结点不能为空并且头结点不能等于尾节点;也就类似下图这个状态,并且头节点的waitStatus
等于SIGNAL
;等于SIGNAL
说明后面是有节点的;有节点才能去唤醒节点;
这个方法同时也是共享锁释放锁的方法;共享锁的释放我们就不再阐述了;到这里我们把AQS独占锁的获取,独占锁的释放以及共享锁的获取和共享锁的释放都讲解了一篇;这篇文章也写了好几天;真的希望能帮到你,如果你能看到这里说明你还认可我的文章,不然你可能早就不看了;希望能点个赞;