本文重点
前情提要
之前文章中写到了 JDK 中 synchronized
关键字可以实现同步锁,并且详细分析了底层的实现原理。
虽然 synchronized
在性能上不再被人诟病,但是在实际使用中仍然缺乏一定的灵活性。
比如在一些场景中需要去尝试获取锁,如果失败则不再进行等待,又或者设置一定的等待时间,超时后就放弃等待。
正文开始,ReentrantLock 介绍
java.util.concurrent.locks
包提供了多种锁机制的实现,本文暂且以 ReentrantLock
为例,探究 Java 是如何实现同步锁的。
ReentrantLock
,先看名字,reentrant
——可重入的,表示持锁线程可以多次加锁。
使用案例如下:
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
//do something
} finally {
lock.unlock();
}
复制代码
AQS
ReentrantLock
底层是由 AQS 框架实现的,并且 java.util.concurrent.locks
提供的多种同步锁都是 AQS 的子类。
AQS,先看名字,AbstractQueuedSynchronizer
,抽象的队列式同步器。AQS 是 Java 中同步锁实现的基石,由 Java 之神 Doug Lea
操刀。
AbstractQueuedSynchronizer
的核心思想是提供了一个同步队列,将未获取到锁的线程阻塞排队。
提供的关键方法如下,此处用到了模板模式,方法的具体实现由子类来提供。
// 获取锁
public final void acquire(int arg) {
}
// 释放锁
public final boolean release(int arg) {
}
// 尝试获取锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
复制代码
AQS 中的队列由 Node 组成的双端列表实现。
static final class Node {
// 标识共享锁
static final Node SHARED = new Node();
//标识独占锁
static final Node EXCLUSIVE = null;
// 已取消
static final int CANCELLED = 1;
//后继节点在等待当前节点唤醒
static final int SIGNAL = -1;
//等待在 condition 上
static final int CONDITION = -2;
//共享模式下,唤醒后继节点
static final int PROPAGATE = -3;
//状态,默认 0
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
复制代码
/**
// 头结点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 同步状态
private volatile int state;
复制代码
ReetrantLock 原理
1. 创建公平锁
ReentrantLock lock = new ReentrantLock(true)
,参数 true 标识创建的是公平锁。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
复制代码
2、执行 lock()
final void lock() {
acquire(1);
}
复制代码
acquire()
由 AQS 提供,并且不可重写,源码如下。
public final void acquire(int arg) {
// 1. 尝试获取锁
if (!tryAcquire(arg) &&
// 2. 如果未获取到, 则加入队列
//addWaiter,创建节点,并设置下一节点为null,设置为尾结点
//acquireQueued,将当前节点加入队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//设置线程中断标志
selfInterrupt();
}
复制代码
- 尝试获取锁,tryAccquire() 由子类根据自己的策略实现
- 如果获取失败,则执行 addWaiter(),将当前线程封装成一个 Node, 设置为尾结点
- 执行 acquireQueued(),将当前节点加入到同步队列
tryAccquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state = 0 代表没有线程占用
if (c == 0) {
// hasQueuedPredecessors 返回是否需要排队,需要排队则返回 true
if (!hasQueuedPredecessors() &&
//不需要排队,则 CAS 设置 state = 1
compareAndSetState(0, acquires)) {
// 设置成功,则抢到锁,设置持锁线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果被是当前线程占用,则记录重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
1、公平锁实现的 tryAcquire,首先获取同步状态 state,不为 0 代表有线程持锁,则判断是否是当前线程,如果是则 state + 1, 因此是可重入的。
2、如果 state = 1,则检查是否需要排队 hasQueuedPredecessors,如果需要排队则不继续判断,走加入队列的逻辑。
3、如果不需要排队,通过 CAS 设置同步状态 state = 1, 设置失败则去排队,设置成功则设置持锁线程为当前线程。
addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 将当前节点设置为尾节点
Node pred = tail;
if (pred != null) {
//设置节点上一节点为尾结点
node.prev = pred;
//CAS 操作,将尾结点设置为当前节点
if (compareAndSetTail(pred, node)) {
//设置之前尾结点下一节点,指向当前尾结点, 返回尾结点
pred.next = node;
return node;
}
}
//如果尾结点是null, 或者 CAS 操作失败,进行自旋 enq() 加入尾结点
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾节点是null,则 CAS 初始化队列的头节点 head,头结点是空节点, tail = head
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 设置当前节点前一节点为 tail
node.prev = t;
// CAS 操作设置当前节点为 尾结点
if (compareAndSetTail(t, node)) {
// 设置之前尾结点指向当前节点
t.next = node;
return t;
}
}
}
}
复制代码
1、需要排队的,先将线程封装成一个 Node,如果尾结点为空,则说明队列还未初始化,则通过 enq(node) 进行初始化操作。
2、如果尾结点不为空,则通过 CAS 尝试将当前节点设置为尾结点,返回当前节点。
3、如果 CAS 执行失败,则通过 enq(node) 进行自旋尝试加入队尾。
acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// p 是前一节点
final Node p = node.predecessor();
// 如果 p 是头节点, 再挣扎一次尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取成功,设置当前节点为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 返回false, 获取锁成功,没有进入队列
}
// 检查是否需要线程阻塞等待,如果前一个不是待唤醒,则自旋
// 如果前一个待唤醒,则当前线程也阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前一个节点等待被唤醒
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
// 前一节点已被取消,跳过,再向前找节点
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 默认值是0,设置为待唤醒
// CAS 设置前一节点等待唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
1、通过 acquireQueued 将当前节点加入队尾,并设置阻塞。自旋,判断如果当前节点的前驱节点。是头结点(head 节点不排队,只记录状态,head 的后驱节点才是真正第一个排队的),则再次尝试 tryAcquire() 获取锁。
2、可以看到自旋的跳出条件是当前节点是队列中第一个,并且获取锁。
3、如果一直自旋,则会消耗 CPU 资源,因此使用 shouldParkAfterFailedAcquire 判断是否需要将当前线程阻塞,如果是则通过 parkAndCheckInterrupt 阻塞线程的运行。
4、LockSupport.park() 是通过 native 方法 UNSAFE.park() 实现的线程阻塞。
3、执行 unlock()
释放锁的逻辑相对比较简单。
public void unlock() {
sync.release(1);
}
复制代码
AQS 的 release()
方法。
public final boolean release(int arg) {
//释放锁
if (tryRelease(arg)) {
Node h = head;
//队列元素需要唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
tryRelease()
protected final boolean tryRelease(int releases) {
//多次重入需要多次释放
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//等于0代表释放,设置锁持有线程为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//设置状态为0
setState(c);
return free;
}
复制代码
unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// CAS 操作设置等待状态为 0-初始态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//取下一节点
Node s = node.next;
//如果下一节点状态为取消,则从尾结点开始循环,找到最前面未取消的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// unpark 唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
1、unlock() 方法调用了 AQS 的 release()方法,执行 tryRelease() 释放锁。
2、tryRelease() 由子类实现,reentrantLock 释放逻辑是将 state – 1, 直到 state = 0 才认为是释放完毕,多次重入需要多次释放。
3、通过 AQS 的 unparkSuccessor() 唤醒下一个等待的线程。
4、使用 LockSupport.unpark(s.thread) 唤醒线程。
公平与非公平
实际上公平与非公平的区别如下:
1、公平锁的流程是,当前有线程持锁,则新来的线程乖乖去排队。
2、非公平锁的流程是,新来的线程先去抢一把试试,没抢到再去排队。
非公平锁的加锁逻辑如下。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
复制代码
总结 AQS 的核心
AQS 中使用了几个核心的操作来进行同步锁的控制。
总结 ReentrantLock 流程
这还没完,想要彻底搞定每一步的细节,还得去翻看源码,细细品味。
回复 AQS 关键字,可获取思维导图和执行流程图。
关注一下,不迷路,老司机不定期发车。