这是我参与更文挑战的第 3 天,活动详情查看: 更文挑战
前面我们对 AQS
、Lock
和 Condition
进行了学习,我们知道 Java
并发编程中的锁机制都是基于 AQS
框架而来,那么今天我们就来学习 ReentrantLock
锁。
1. ReentrantLock 是什么?
ReentrantLock
是个可重入的互斥锁 Lock
,它具有与使用 synchronized
方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock
将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁没有被另一个线程所拥有时,调用 lock
的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread()
和 getHoldCount()
方法来检查此情况是否发生。
可重入性:从名字上理解,
ReentrantLock
的字面意思就是再进入的锁,其实synchronized
关键字所使用的锁也是可重入的,两者关于这个的区别不大。两者都是同一个线程没进入一次,锁的计数器都自增1
,所以要等到锁的计数器下降为0
时才能释放锁。
ReentrantLock
的组织结构:
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
}
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}
复制代码
从中可以看出:
ReentrantLock
实现了Lock
接口。ReentrantLock
与sync
是组合关系。ReentrantLock
中,包含了Sync
对象;而且,Sync
是AQS
的子类;更重要的是,Sync
有两个子类FairSync
(公平锁)和NonFairSync
(非公平锁)。ReentrantLock
是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于sync
对象是FairSync
的实例,还是NonFairSync
的实例。
2. ReentrantLock 源码解析
既然 ReentrantLock
作为锁,还是看它的基本构成:
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
....此处省略N多代码
复制代码
很简单,内部就包含一个 Sync
类对象。Sync
类又未何方神圣?
2.1 Sync 类分析
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
复制代码
从类的注释中可以知道,Sync
继承 AQS
类,并且是 ReentrantLock
类同步的基础,它有fair
和 nonfair
两个子类。通过 AQS
类的 state
状态 number
值来记录是否持有锁。基本组成:
abstract void lock()
boolean nonfairTryAcquire(int acquires)
boolean tryRelease()
boolean isHeldExclusively()
Thread getOwner()
getHoldCount()
在 Sync
类中声明了抽象方法 lock
供子类来实现。Sync
类中实现了锁的获取、释放、是否持有等方法。
2.1.1 NonfairSync 非公平锁
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
复制代码
在非公平锁中继承 Sync
并实现了 lock
方法。首先通过 compareAndSetState()
方法获取 state
状态,如果成功,则设置 setExclusiveOwnerThread
独占锁的线程。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
可以看到 CAS
方法本质是调用 Unsafe
类的 compareAndSwapInt
方法,compareAndSwapInt
的实现如下:
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
复制代码
可以看到, 不是用 Java
实现的, 而是通过 JNI
调用操作系统的原生程序。那么看一下它的 JNI
实现:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
复制代码
可以看到实际上调用 Atomic
类的 cmpxchg
方法。 Atomic
的 cmpxchg
这个类的实现是跟操作系统有关, 跟 CPU
架构也有关, 如果是 windows下x86
的架构
实现在 hotspot\src\os_cpu\windows_x86\vm\目录
的 atomic_windows_x86.inline.hpp
文件里 。
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
复制代码
在这里可以看到是用嵌入的汇编实现的, 关键 CPU
指令是 cmpxchg
到这里没法再往下找代码了. 也就是说 CAS
的原子性实际上是 CPU
实现的. 其实在这一点上还是有排他锁的. 只是比起用 synchronized
, 这里的排他时间要短的多. 所以在多线程情况下性能会比较好。
第二步,如果通过 CAS
操作失败,则主动通过 acquire
方法获取锁。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
在 acqurie()
方法中,首先通过 tryAcquire()
方法进行快速申请判断。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
这里我们可以看到在 acquire
方法中通过 tryAcquire()
进行快速申请锁,我们可以看到在 tryAcquire
内部是调用的 Sync
类的 nonfairTryAcquire()
方法。nonfairTryAcquire
的逻辑如下:
- 首先通过
getState()
获取同步器状态值,如果状态值为0
,即当前锁是空闲,则设置为当前线程持有锁; - 如果当前线程已经具备锁,则将当前
state
进行+acquires
。
2.1.2 公平锁 FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
在公平锁的 lock
方法实现中,同样可以看到直接通过 acquire(1)
方法获取锁的状态,最终通过 CAS
方法保证锁的状态。
ReentrantLock
支持公平所和非公平所,同时内部定义了 Sync
类实例,只需要根据构造方法中传入的要求创建对应的实例即可。
2.1.3 总结
公平锁和非公平锁体现在申请 Lock
时,公平锁直接通过 acquire
方法进行锁的申请,acquire
方法的底层调用是 tryAcquire
方法,在 tryAcquire
方法中如果锁是空闲则返回 true
立即获取到锁,如果当前线程已经持有锁,则锁计数器+1
,其它情况返回 false
。接着通过 acquireQueued
方法循环,申请不到添加到等待 CLH
队列中。在非公平锁中,先判断当前锁是否空闲,如果空闲则不管等待队列中的线程,直接申请到。反之如果被占有,则调用 acquire
方法。
2.2 ReentrantLock 方法解析
对 ReentrantLock
的其它方法进行分析。
2.2.1 lock()
public void lock() {
sync.lock();
}
复制代码
如果锁没有被其它线程持有,立即获取到锁,同时将锁的持有 count+1
。如果当前线程已经过去到锁,则将 count+1
并立即返回。如果锁被其它线程持有,则当前线程在获取锁之前不可用。
2.2.2 lockInterruptibly()
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
复制代码
获取锁,有以下场景:
- 如果锁没有被其它线程获取,则立即获取锁并返回,并且在锁计数器上+1
- 如果当前线程已经持有锁,则立即返回并在锁计数器上+1
- 如果锁被另外一个线程占用,则当前线程不可用直到发生以下两个事情:①当前线程主动获取到锁;②其它线程调用当前线程的interrupts方法。
2.2.3 tryLock()
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
复制代码
尝试获取锁,如果锁没有被其它线程持有,则立即获取到并返回 true
并且锁计数器 +1
,反之返回 false
。尽管如果 lock
被设置为公平锁,但是调用 tryLock
就会立即申请到锁(如果锁可用)。不管别的线程是否在等待状态。
2.2.4 unlock()
public void unlock() {
sync.release(1);
}
复制代码
尝试释放锁,如果当前线程持有锁,则锁计数器减少 -1
。如果当前计数器为 0
,就释放锁。如果当前线程没有持有锁,则抛出 IllegalMonitorStateException
异常。
2.2.5 getHoldCount()
public int getHoldCount() {
return sync.getHoldCount();
}
复制代码
获取锁在“持有”的线程上的锁计数器个数。
2.2.6 isHeldByCurrentThread()
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
复制代码
当前线程是否持有该锁。
2.3 ReentrantLock 与 Synchronized 的区别
ReenTrantLock
可以指定是公平锁还是非公平锁。而synchronized
只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。ReenTrantLock
提供了一个Condition
(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized
要么随机唤醒一个线程要么唤醒全部线程。ReenTrantLock
提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()
来实现这个机制。ReentrantLock
与synchronized
相比,ReentrantLock
提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候。
参考阅读