线程同步「AQS原理解析」

最近在看LinkedBlockingQueue源码的时候发现内部是采用ReentrantLock进行线程同步的,众所周知 Java 中线程同步的方法除了最常用的synchronized关键字和Object#wait等方法,还有另一种方式那就是使用ReentrantLock类。

ReentrantLock本身只是实现了Lock接口,而真正实现线程同步的关键是另外一个类AbstractQueuedSynchronizer(AQS),AQS是 Java 中一个线程同步框架,基于AQS重写tryAcquiretryRelease等方法就可以实现线程同步,因此本文就从ReetrantLock为例子,解析AQS的工作流程。

Tips

  • 代码基于 Java 8
  • 全文会用AQS指代AbstractQueuedSynchronizer
  • 为了提升阅读流畅性,文中会在需要的地方将代码二次贴出,并加上repeat标识

源码梳理

加锁

ReentrantLock#lock

首先,从ReentrantLock#lock方法入手。

    public void lock() {
        sync.lock();
    }
复制代码

调用的是ReentrantLock$Sync#lock方法,根据创建ReentrantLock时的参数可为NonfairSync#lockFairSync#lock,分别代表非公平锁和公平锁实现,它们之间的具体类图关系如下:

class.png

为了更好地理解,本文以ReentrantLock默认构造函数所用的NonfairSync为例,至于FairSync公平锁在实现逻辑上基本相同。

NonfairSync#lock

    final void lock() {
        if (compareAndSetState(0, 1)) //CAS 尝试设置 state 为 1,0 表示没有线程持有锁
            setExclusiveOwnerThread(Thread.currentThread()); //成功,设置当前线程为占有线程,lock 流程结束
        else
            acquire(1); //失败,执行 AQS#acquire 方法
    }
复制代码

AQS#acquire

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && //实际相当于调用 Sync#nonfairTryAcquire
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

首先调用tryAcquire,注意此方法在AQS中为空实现,具体实现在NonfairSync中,实际内部调用Sync#nonfairTryAcquire

Sync#nonfairTryAcquire

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); //获取 AQS 的 state
        if (c == 0) { //等于0,表示没有线程持有锁
            if (compareAndSetState(0, acquires)) { //CAS,成功表示获取锁,acquires 通常为 1
                setExclusiveOwnerThread(current); //设置占有线程,lock 流程结束
                return true;
            }
        }
        // Mark A
        else if (current == getExclusiveOwnerThread()) { //如果当前线程就是占有锁的线程 
            int nextc = c + acquires; //state += acquires
            if (nextc < 0) //超过 Integer.MAX_VALUE,溢出为负数
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false; //acquire 失败
    }
复制代码

从上面的Mark A代码块可以看出ReentrantLock的可重入性,每调用一次lock意味着state += 1,调用unlockstate -= 1,因此完全释放锁需要调用相同次数的unlock(),当 state 为 0 时表明锁被释放。

如果tryAcquire失败的话,紧接着调用addWaiter方法,addWaiter会生成一个AQS$Node,这个Node实际是对当前线程的一个封装。

AQS#addWaiter

    //repeat start
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //acquire 失败,先调用 addWaiter
            selfInterrupt();
    }
    //repeat end

    //AQS#addWaiter
    private Node addWaiter(Node mode) { //mode = Node.EXCLUSIVE
        Node node = new Node(Thread.currentThread(), mode); //将线程封装成 Node 
        Node pred = tail; //获取尾节点
        if (pred != null) { //尾节点不为空,表明队列不为空
            node.prev = pred;
            // Mark A
            if (compareAndSetTail(pred, node)) { //CAS 尝试将当前 Node 替换成尾节点
                pred.next = node; //成功替换为尾节点,将上一个尾节点的 next 指向当前 Node
                return node; //返回 Node
            }
        }
        enq(node); //失败,说明有其他线程替换了尾节点,调用 enq
        return node;
    }

    //AQS#enq
    private Node enq(final Node node) {
        for (;;) { //开始循环
            Node t = tail;
            if (t == null) { //尾节点为空 = 队列为空,需要初始化一个节点,该节点不代表任何线程,是一个 dummy 节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { //同上 Mark A 处代码相同,将当前 Node 设置到尾节点,失败则进入下一个循环,直至成功
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
复制代码

通过上面的代码可以了解到,AbstractQueuedSynchronizerQueued指的是在该类中维护了一个Node的双向链表结构的 FIFO 队列,这个队列中如果有 n 个线程,那么就会有 n + 1 个 Node,因为头节点是个虚Node

addWaiter之后会将当前Node返回给acquireQueued,这同时意味着我们的线程已经在队列尾部了。

AQS#acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false; //中断标记
            for (;;) { //开始循环
                final Node p = node.predecessor(); //获取前一个 Node

                //判断前一个节点是不是 head,是的话 CAS 尝试获取锁
                //head 是 dummy Node,等于说我们是队列里第一个等待执行的线程
                if (p == head && tryAcquire(arg)) { 
                    //成功获取锁,将当前 Node 设置为 head
                    //另外 setHead 中 Node.thread 会被设置为 null,Node 变成新的 dummy head
                    setHead(node); 
                    p.next = null; //释放前节点的引用,帮助 GC
                    failed = false;
                    return interrupted; // == false
                }
                if (shouldParkAfterFailedAcquire(p, node) && //tryAcquire失败,检查是否需要 park 线程(Go A ↓↓↓)
                    parkAndCheckInterrupt()) //执行 park,并检查中断 (Go B ↓↓↓)
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // A
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //前一个节点已经 SIGNAL 了,意味着当前节点可以直接 park
            // SIGNAL = -1, CONDITION = -2, PROPAGATE = -3, CANCELLED = 1
            return true;
        if (ws > 0) { //前一个节点已经取消
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); //循环向前寻找节点直到不为 CANCELLED 的节点
            pred.next = node; //弃置中间的 CANCELLED 节点
        } else {
            //前一个节点要么为 0 要么为 PROPAGATE,CAS 尝试将其设置为 SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // B
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); //执行到这里表明线程阻塞,停止自旋获取锁,等待 unpark
        return Thread.interrupted();//unpark 恢复执行后检查是否有中断
    }
复制代码

LockSupport#park为止,整个ReentrantLock#lock方法的流程梳理完毕。

小结

  1. Reentrant#lock实际就是执行NonfairSync#lock,在这一步线程会直接尝试 CAS 争抢锁,失败的话才进行第二步。
  2. 这一步执行AQS#acquire实际执行Sync#nonfairTryAcquire,和第一步一样先尝试 CAS 争抢锁,同时这一步最关键的是处理重入锁,等于说所有重入的线程在第一步都会失败,然后在这里成功。
  3. AQS#addWaiter的主要工作就是把线程封装成Node节点,并插入到队列的尾端。
  4. Node入列后,这一步的工作就是等待出列,先循环判断Node是不是队列里第一个等待执行的线程,是的话尝试 CAS 争抢锁,这个行为称为锁的自旋;锁自旋一定次数后,判断条件为Node的前一个节点的状态如果变成SIGNAL,那么停止自旋,线程park阻塞,等待唤醒。

从总结中可以看出,每个线程在入列之前,都会在 1、2 步先尝试 CAS 争抢锁,这里体现了非公平锁的特点,不重点关注加锁的先后顺序,提升了吞吐量。

一点引申

  • 关于acquireQueuedfinal方法块什么时候在会执行以及CANCELLED节点,因为这篇文章主要分析的是ReentrantLock#lock方法,在这个前提下经过测试后,我认为在单纯调用ReentrantLock#lock的时候,应该只有在发生未知异常的时候会触发会执行cancelAcquire,因为在线程park以后,调用Thread#interrupt方法也只是让线程恢复执行,并不会直接抛出异常,Node最终还是会在队列中继续去获取锁,只是返回interruptedtrue。所以我觉得在使用ReentrantLock#lock的时候,队列中的NodewaitStatus应该不会出现CANCELLED,但是如果调用的是ReentrantLock#lockInterruptibly,在线程interrupt后是会直接抛出异常的,,final中就会生成CANCELLED节点,感兴趣可以看AQS#doAcquireInterruptibly方法,如果我的理解有误的话欢迎指出。

释放锁

ReentrantLock#unlock

    public void unlock() {
        sync.release(1); //调用 AQS#release
    }
    
    //AQS#release
    public final boolean release(int arg) {
        //调用 Sync#tryRelease,NonfairSync 和 FairSync都没有重写此方法
        //true 表明锁被释放 (Go A ↓↓↓)
        if (tryRelease(arg)) {
            Node h = head; //头节点,一开始初始化时为 dummy 节点

            //1. 如果 head 为空说明队列没有初始化过,队列中没有等待的 Node
            //2. 如果 head 的 state 为 0,说明还没有被后驱的节点设置成 SIGNAL
            //那么后面的节点线程还处于自旋状态,尚未 park,不需要 unpark
            if (h != null && h.waitStatus != 0) 
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    //A
    //Sync#tryRelease
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // state - 1
        if (Thread.currentThread() != getExclusiveOwnerThread()) //如果调用的线程并没有持有锁,抛出异常
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { //如果 state 为 0,表明锁被释放,不为 0 只是减少了一次重入
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
复制代码

unlock中所做的工作就是减少AQSstatestate为 0 则说明锁已释放,如果此时队列中没有线程或第一个线程没有park的话,那么unlock的工作也就结束了,但是如果有park的线程,那么还需要负责唤醒被park的线程。

AQS#unparkSuccessor

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) // ws 不为 CANCELLED
            compareAndSetWaitStatus(node, ws, 0); //将 waitStatus 设为 0

        Node s = node.next; //如果 node 是 head 的话,s 也是第一个真实节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //s 如果为 null 或已经 CANCELLED,从队列尾部开始遍历
            //尝试寻找 s 节点之后第一个不为 CANCELLED 的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //唤醒该线程
    }
复制代码

为什么这里要从尾部开始寻找Node?
addWaiter Mark A处可以看到做的事情是:

  1. 为新Node设置前驱节点
  2. 将新Node CAS 设置为tail
  3. 将原来的尾Node#next指向新Node

第二、第三步并不是原子操作,如果在这两步之间执行了unparkSuccessor,从头开始遍历的话,不能遍历到最新入列的Node,而从尾部遍历就没有这个问题。另外在产生CANCELLED节点的时候,先断开的是next指针,prev指针并未断开,从尾部开始能保证遍历全所有Node

小结

释放锁的的内容比较简单,就是调用AQS#releaseSync类都没有重写这个方法,每次调用AQSstate都会值减 1 ,当state变为 0 时锁也就释放了,然后判断一下需不需要唤醒队列里的第一个节点。

其他

一点引申这里提过关于CANCELLED节点,对这个我们做进一步的分析。

AQS#cancelAcquire

    private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;//移除 node 的线程

        Node pred = node.prev; // pred = 前驱节点
        //如果 pred.ws == CANCELLED,循环向前遍历,直到找到不为 CANCELLED 的 Node
        while (pred.waitStatus > 0) 
            node.prev = pred = pred.prev;

        Node predNext = pred.next; //pred 原来的 next Node (可能 == node)

        node.waitStatus = Node.CANCELLED;//将 node.ws 设置为 CANCELLED

        //如果当前 node 是尾节点则 CAS 尝试将 pred 设置为 tail
        if (node == tail && compareAndSetTail(node, pred)) {
            //CAS 尝试将 pred.next 设置为 null
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            //多重判断,每一项需都满足
            //1. pred 不是头节点
            //2. (pred.ws == SIGNAL) 或 (ws 不为 CANCELLED 且可以成功被设置成 SINGNAL)
            //3. pred.thread 不能为空 
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                //如果 node.next 不为空且不为 CANCELLED,将 node.next 与 pred 连接
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //唤醒 node 之后第一个不为 CANCELLED 的节点
                unparkSuccessor(node);
            }

            //将 node.next 指向自身
            node.next = node; 
        }
    }
复制代码

这段代码的主要目的就是,在Node变为CANCCELLED的时候,处理其前面的节点的引用,注意只是对Node.next进行处理,Node.prev没有进行操作,简单的说就是在从当前节点开始,向前判断各个节点,直到遇到不为CANCELLED的节点,将该节点的next指向当前节点的next。用图表示如下:

1.png

当然这是最常见的一种情况,代码里还有对各种特殊情况的判断,比如找到的前驱节点是不是头节点、当前节点是不是尾节点等,但是都是为了处理next引用。此时这些CANCELLED节点还在队列里,因为他们的引用没有完全断开,还有prev引用连接着。

那为什么不在这里处理prev引用?

因为节点在自旋的过程中AQS#shouldParkAfterFailedAcquire会对自身的prev进行更新,移除掉前面的无效节点,但这个操作不是原子性的,如果在这里修改prev节点,那么指向的节点有可能因为之后也CANCELLED了并且在shouldParkAfterFailedAcquire被移除,那就会变成指向到一个被移除的节点。

    //AQS#shouldParkAfterFailedAcquire
    //repeat start
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    }
    //repeat end
复制代码

总结

至此,从ReentrantLock切入进行分析AQS的加锁和释放锁部分也就分析完了,总体框架就是将线程封装成节点,然后利用 FIFO 队列实现同步,再加上一些性能如优化减少线程过多自旋等。剩下的AQS$ConditionObject会在后续的文章里进行分析。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享