之前把 AQS、Unsafe(LockSupport 其实就是一层简单的封装) 搞完了,接下来就可以看看各种锁、并发工具、线程池了,毕竟他们都有依赖 AQS 或 Unsafe 的实现。
Lock & Condition
Java SDK 并发包通过 Lock 和 Condition 两个接口重新实现一次管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
那为什么要重新实现一次呢,用关键字 synchronized 不就好了吗?
很多程度上是为了有效避免死锁。
原因是 Synchronized 的使用局限性,很难破坏不可抢占条件。
- 不支持阻塞超时;
- 不支持得知是否获取到锁;
- 不支持非阻塞获取;
- 不支持中断;
- 不支持多个条件的协同;举例来说,synchronized 仅支持在锁上的等待通知;而并发包的 Lock 可以允许有多个 Condition,因此支持多个维度的等待通知。
Condition 接口在 JDK 中可以说只有一个实现 ConditionObject
,是 AQS 的内部类。Lock 接口中关于 Condition 的应用都是基于该实现。那关于该类可以参见AQS Condition 源码解析 (juejin.cn),这里就不详细说明了。
着重说一下 Lock。
Lock 接口主要有两个标准实现 Reentrantlock
和 ReentrantReadWriteLock
,这篇和接下来会分析一下这两个类的源码。
其实在 lock 的包下,JDK 还提供了一个乐观读(还有两种状态:悲观读锁、写锁)的实现:
StampedLock
,在读多写少的情况下,StampedLock 性能更优于 ReadWriteLock。但是该类,并没有实现 Lock 的接口,而关键的两点原因是:
StampedLock
没有所有权的概念,简单来说,在当前线程获取的锁,可以在另一线程下被释放;- 悲观读锁、写锁都不支持条件变量。
ReentrantLock 与 AQS 的关联
AQS 定义了关于资源排队、阻塞唤醒等操作;而最关键的资源state
获取、释放以及资源的含义,都由子类实现。对于 AQS 来说,只是一个可以操作的 int 值。
而 ReentrantLock 就是这个子类。ReentrantLock 允许有两种语义:公平锁和非公平锁,但是无论哪种语义,本质上内部都是实现了 AQS,赋予 state 具体的含义,并依次实现资源的获取和释放方法。
isHeldExclusively()//该线程是否正在独占资源。只有用到 condition (AQS.ConditionObject)才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
复制代码
更加具体的可以见 AbstractQueuedSynchronizer(AQS):并发工具的基石 (juejin.cn)
如何保证可见性?
根据 volatile 相关的 happens-before 原则。
Lock 和 Condition 实际是通过 AQS 去实现的。在 AQS 中有一个state 字段是 volatile 的,加锁解锁时都会读写该字段。
根据相关原则:
- 顺序性原则;线程 1 业务操作早于线程 1 lock 的解锁。
- volatile 原则,对该字段的写 happens before 对该字段的读。即实际是线程 1 解锁早于线程 2 加锁。
- 传递原则;线程 1 业务操作早于线程 2 加锁。
那如果明白了 AQS 是个啥玩意,ReentrantLock
其实也就没有什么内容了。
因为关键的方法,其实内部都是调用 AQS 的方法。不过有些东西还是要看一下的,最关键的就是可重入的实现以及公平锁和非公平锁的差异。
公平锁与非公平锁:
锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。
如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;
如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
可重入锁的实现
ReentrantLock
有个实现了 AQS 的内部类,叫 Sync
,实现了上述 AQS 关于独占模式的方法(因为 ReentrantLock
不支持共享锁啊)。
基于 AQS 中 state == 0 表示资源可用,ReentrantLock
定义 state > 0 就是锁已经被线程持有,其值就是代表重入次数。那可以得出一个线程最大重入次数为: int 的最大值,2147483647。
- 每次重入获取,state + 1;
- 释放 state -1;
- state 减到0,就代表资源可用,等待线程加入争抢或第一个排队线程获取到。
注意:在
ReentrantLock
中,tryAcquire、tryRelease 或类似方法的参数 acquires、release 都是 1,代表一次重入。
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 说明没有线程持有锁, 直接获取到
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果是当前线程持有锁
int nextc = c + acquires;
// 溢出说明重入次数超标了
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 因为是重入,肯定不会有争抢,直接 set 即可
setState(nextc);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 只允许持有线程释放锁
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 等于 0 的时候, 说明完全释放了锁
free = true;
setExclusiveOwnerThread(null);
}
// 同理不会有争抢, 直接set
setState(c);
return free;
}
}
复制代码
公平锁 & 非公平锁
可以看到 Sync
获取只有一个 nonfairTryAcquire
的方法,基于这个方法和 AQS 默认是非公平的获取(获取时先尝试拿资源,拿不到才开始排队),得出Sync
只有非公平锁。
因为 Sync
是一个抽象父类,统一实现了默认的非公平获取和释放,所以公平锁的实现其实就是简单调用了一下。
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
复制代码
所以,还需要一个非公平锁的实现,继承自 Sync
。
static final class FairSync extends Sync {
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 相对非公平实现, 公平实现只多了这个 : !hasQueuedPredecessors()
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
相对非公平实现, 公平实现只多了这个 : !hasQueuedPredecessors()
,关于这个方法具体实现可以看 AQS 的实现。
简单来说,就是遍历一下等待队列,判断出是否有排队的线程。如果有排队线程,则直接退出获取,走向 AQS 的排队了。
ReentrantLock
如何决定是哪种锁呢?
因为内部有两种实现,所以简单的讲,就是内部持有父类 Sync
的实例,将相关锁操作代理给该实例即可。而 ReentrantLock
只需要提供参数或构造函数来让调用者决定是哪种实现。
默认是非公平锁,因为可以提供更好的吞吐量,减少线程阻塞、唤醒的次数。
主要是服务刚释放锁时来争抢的那批线程。
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}
复制代码
锁的公有方法直接代理给Sync
获取
void lock() {sync.acquire(1);}
void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}
public boolean tryLock() {return sync.nonfairTryAcquire(1);}
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
复制代码
不同于其他几种方法(直接调用 AQS 的公开方法,由 Sync 的实现决定是公平锁还是非公平锁),tryLock
是直接调用内部非公平锁的实现nonfairTryAcquire
。
因为 tryLock 既不能响应中断,也不会超时,所以公平锁的话,容易造成等待时间过长。
释放
public void unlock() {sync.release(1);}
复制代码
Condition
上面说过,Condition 只有一种 AQS 内部类的实现,所以获取 Condition 的方法也可以直接代理给 Sync
。
统计公有方法直接代理给Sync
由于赋予了 state 具体含义(重入次数),所以相关判断、统计方法会优先基于该字段判断或获取。
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
复制代码
readObject
的实现决定了 Reentrantlock
的反序列化结果一定是无锁状态,虽然不知道有啥子用。
总结
以前觉得Reentrantlock
挺复杂,是因为复杂内容都在 AQS。如果把 AQS 搞明白了,那Reentrantlock
就很直白了。