Java 并发编程 —— ReentrantLock

这是我参与更文挑战的第 3 天,活动详情查看: 更文挑战

前面我们对 AQSLockCondition 进行了学习,我们知道 Java 并发编程中的锁机制都是基于 AQS 框架而来,那么今天我们就来学习 ReentrantLock 锁。

1. ReentrantLock 是什么?

ReentrantLock 是个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread()getHoldCount() 方法来检查此情况是否发生。

可重入性:从名字上理解,ReentrantLock 的字面意思就是再进入的锁,其实 synchronized 关键字所使用的锁也是可重入的,两者关于这个的区别不大。两者都是同一个线程没进入一次,锁的计数器都自增 1,所以要等到锁的计数器下降为 0 时才能释放锁。

ReentrantLock 的组织结构:

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
    }

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
       
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
       
    }

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
}
复制代码

从中可以看出:

  • ReentrantLock 实现了 Lock 接口。
  • ReentrantLocksync 是组合关系。ReentrantLock 中,包含了 Sync 对象;而且,SyncAQS 的子类;更重要的是,Sync 有两个子类 FairSync(公平锁)和 NonFairSync(非公平锁)。ReentrantLock 是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于 sync 对象是 FairSync 的实例,还是 NonFairSync 的实例。

2. ReentrantLock 源码解析

既然 ReentrantLock 作为锁,还是看它的基本构成:

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    ....此处省略N多代码
复制代码

很简单,内部就包含一个 Sync 类对象。Sync 类又未何方神圣?

2.1 Sync 类分析

/**
 * Base of synchronization control for this lock. Subclassed
 * into fair and nonfair versions below. Uses AQS state to
 * represent the number of holds on the lock.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is
     * implemented in subclasses, but both need nonfair
     * try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes this lock instance from a stream.
     * @param s the stream
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}
复制代码

从类的注释中可以知道,Sync 继承 AQS 类,并且是 ReentrantLock 类同步的基础,它有fairnonfair 两个子类。通过 AQS 类的 state 状态 number 值来记录是否持有锁。基本组成:

  • abstract void lock()
  • boolean nonfairTryAcquire(int acquires)
  • boolean tryRelease()
  • boolean isHeldExclusively()
  • Thread getOwner()
  • getHoldCount()

Sync 类中声明了抽象方法 lock 供子类来实现。Sync 类中实现了锁的获取、释放、是否持有等方法。

2.1.1 NonfairSync 非公平锁

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
复制代码

在非公平锁中继承 Sync 并实现了 lock方法。首先通过 compareAndSetState() 方法获取 state 状态,如果成功,则设置 setExclusiveOwnerThread 独占锁的线程。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码

可以看到 CAS 方法本质是调用 Unsafe 类的 compareAndSwapInt 方法,compareAndSwapInt 的实现如下:

/**
 * Atomically update Java variable to <tt>x</tt> if it is currently
 * holding <tt>expected</tt>.
 * @return <tt>true</tt> if successful
 */
public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);
复制代码

可以看到, 不是用 Java 实现的, 而是通过 JNI 调用操作系统的原生程序。那么看一下它的 JNI 实现:

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
复制代码

可以看到实际上调用 Atomic 类的 cmpxchg 方法。 Atomiccmpxchg
这个类的实现是跟操作系统有关, 跟 CPU 架构也有关, 如果是 windows下x86 的架构
实现在 hotspot\src\os_cpu\windows_x86\vm\目录atomic_windows_x86.inline.hpp 文件里 。

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}
复制代码

在这里可以看到是用嵌入的汇编实现的, 关键 CPU 指令是 cmpxchg
到这里没法再往下找代码了. 也就是说 CAS 的原子性实际上是 CPU 实现的. 其实在这一点上还是有排他锁的. 只是比起用 synchronized, 这里的排他时间要短的多. 所以在多线程情况下性能会比较好。

第二步,如果通过 CAS 操作失败,则主动通过 acquire 方法获取锁。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
复制代码

acqurie() 方法中,首先通过 tryAcquire() 方法进行快速申请判断。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
复制代码

这里我们可以看到在 acquire 方法中通过 tryAcquire() 进行快速申请锁,我们可以看到在 tryAcquire 内部是调用的 Sync 类的 nonfairTryAcquire() 方法。nonfairTryAcquire的逻辑如下:

  • 首先通过 getState() 获取同步器状态值,如果状态值为 0,即当前锁是空闲,则设置为当前线程持有锁;
  • 如果当前线程已经具备锁,则将当前 state 进行 +acquires

2.1.2 公平锁 FairSync

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
复制代码

在公平锁的 lock 方法实现中,同样可以看到直接通过 acquire(1) 方法获取锁的状态,最终通过 CAS 方法保证锁的状态。

ReentrantLock 支持公平所和非公平所,同时内部定义了 Sync 类实例,只需要根据构造方法中传入的要求创建对应的实例即可。

2.1.3 总结

公平锁和非公平锁体现在申请 Lock 时,公平锁直接通过 acquire 方法进行锁的申请,acquire 方法的底层调用是 tryAcquire 方法,在 tryAcquire 方法中如果锁是空闲则返回 true 立即获取到锁,如果当前线程已经持有锁,则锁计数器+1,其它情况返回 false。接着通过 acquireQueued 方法循环,申请不到添加到等待 CLH 队列中。在非公平锁中,先判断当前锁是否空闲,如果空闲则不管等待队列中的线程,直接申请到。反之如果被占有,则调用 acquire方法。

2.2 ReentrantLock 方法解析

ReentrantLock 的其它方法进行分析。

2.2.1 lock()

public void lock() {
    sync.lock();
}
复制代码

如果锁没有被其它线程持有,立即获取到锁,同时将锁的持有 count+1。如果当前线程已经过去到锁,则将 count+1 并立即返回。如果锁被其它线程持有,则当前线程在获取锁之前不可用。

2.2.2 lockInterruptibly()

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
复制代码

获取锁,有以下场景:

  • 如果锁没有被其它线程获取,则立即获取锁并返回,并且在锁计数器上+1
  • 如果当前线程已经持有锁,则立即返回并在锁计数器上+1
  • 如果锁被另外一个线程占用,则当前线程不可用直到发生以下两个事情:①当前线程主动获取到锁;②其它线程调用当前线程的interrupts方法。

2.2.3 tryLock()

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}
复制代码

尝试获取锁,如果锁没有被其它线程持有,则立即获取到并返回 true 并且锁计数器 +1,反之返回 false。尽管如果 lock 被设置为公平锁,但是调用 tryLock 就会立即申请到锁(如果锁可用)。不管别的线程是否在等待状态。

2.2.4 unlock()

public void unlock() {
    sync.release(1);
}
复制代码

尝试释放锁,如果当前线程持有锁,则锁计数器减少 -1。如果当前计数器为 0,就释放锁。如果当前线程没有持有锁,则抛出 IllegalMonitorStateException 异常。

2.2.5 getHoldCount()

public int getHoldCount() {
    return sync.getHoldCount();
}
复制代码

获取锁在“持有”的线程上的锁计数器个数。

2.2.6 isHeldByCurrentThread()

public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
}
复制代码

当前线程是否持有该锁。

2.3 ReentrantLock 与 Synchronized 的区别

  1. ReenTrantLock 可以指定是公平锁还是非公平锁。而 synchronized 只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。
  2. ReenTrantLock 提供了一个 Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像 synchronized 要么随机唤醒一个线程要么唤醒全部线程。
  3. ReenTrantLock 提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。
  4. ReentrantLocksynchronized 相比,ReentrantLock 提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候。

参考阅读

  1. ReenTrantLock可重入锁(和synchronized的区别)总结
  2. Java多线程系列–“JUC锁”02之 互斥锁ReentrantLock
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享