深入理解AQS

1. 基本概念

Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronized(AQS)实现的。AQS使用了一个int类型的成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

2. 实现原理

2.1 数据结构分析

AQS的核心思想:如果被请求的资源空闲,则将当前线程设置为有效的工作线程,将共享资源的状态设置为锁定状态;如果共享资源被占用,就需要一定的等待唤醒机制来保证分配。这个机制主要使用CLH队列的变体来实现,将暂时用不到的线程加入到队列中。

2.1.1 同步队列

ASQ中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配

同步队列的基本结构如下:

image.png

  • 同步队列的入队操作

同步队列的入队操作必须是原子操作,因此使用了一个基于CAS的方法:compareAndSetTail()方法。

image.png

  • 同步队列的出队操作

设置首节点是通过获取同步状态的成功的线程来实现的,而只有一个线程能够获取到同步状态,因此设置头结点的方法不需要使用CAS来保证。
chudui.png

2.1.2 Node结构

AQS中的每个节点都是一个Node,Node是CLH队列上的节点,包含了当前线程和等待状态等信息。

  • Node
方法和属性值 描述
waitStatus 等待状态
thread 获取同步状态的线程
pre 前驱节点
predecessor 返回前驱节点,没有的话跑出npe
nextWaiter 指向下一个处于CONDITION状态的节点
next 后继节点
  • waitStatus
名称 含义
0 初始状态
CANCELLED 1, 当期线程获取锁的请求已经取消了
CONDITION -2, 表示节点在在等待队列中,节点线程请求被唤醒
PROGATE -3, 当前线程处在SHARED情况下,该字段才会使用
SIGNAL -1, 表示当前线程已经准备好了,就等资源释放
  • 线程两种锁的模式
模式 含义
SHARED 表示当前线程以共享的模式等待锁
EXCLUSIVE 表示当前线程正在以独占的方式等待锁

2.2 同步状态分析

在获取锁的时候分为独占式和共享式,独占式是只要一个线程能够获取到锁,如ReentrantLock,而共享式是可以多个线程线程同时获取到该锁,如ReadWriteLock。

2.2.1 独占模式

2.2.1.1 加锁

  • aquire()方法

aquire()方法以独占式获取资源,如果获取到资源,直接返回,如果没有获取到资源,则进入等待队列,开始自旋操作,直到获取到锁

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

未命名文件 (8).png

  • addWaiter()方法

获取锁失败后会执行addWaiter()方法,将node加入CLH队列中(CAS)

   private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
复制代码
  • acuqireQueued()方法

进入到LCH队列中的线程以自旋的方式获取资源,获取成功后将进行出队操作。

final boolean acquireQueued(final Node node, int arg) {
        //用来标记是否拿到资源
        boolean failed = true;
        try {
            //标记等待过程中是否被中断
            boolean interrupted = false;
            //开始获取锁,要么获取成功,要么中断
            for (;;) {
            //获取当前节点的前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是head,当前线程尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    //将当前节点设置为虚节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //p为头节点没有获取到锁或者p不是头节点,判断是否需要阻塞当前node(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //调用LockSupport.park()
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码
  • setHead() && shouldParkAfterFailedAcuqire()

setHead() 将当前节点设置为虚节点。

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    
// 通过前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取头节点的状态
        int ws = pred.waitStatus;
        //头节点处于唤醒状态
        if (ws == Node.SIGNAL)
            return true;
        //头节点处于取消状态
        if (ws > 0) {
            do {
                //循环向前查找节点,如果节点被取消则出队列
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 设置前驱节点等待状态为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
·

复制代码

上面是独占锁获取资源的方式,接下来说明独占模式释放资源的方式

2.2.1.2 解锁

解锁操作实际上就是将当前线程的状态置0,唤醒下一个线程

  • release()
    public final boolean release(int arg) {
    //自定义的tryRease()如果返回true,说明该锁没有被任何线程持有
        if (tryRelease(arg)) {
            //获取头节点
            Node h = head;
            //头节点为空且头节点的waitStatus不是初始化节点情况,解除线程挂起状态
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码
  • unparkSuccessor()

如果node的后继节点不为空且不是取消状态,则唤醒这个后继节点;否则从末尾开始寻找合适的节点,如果找到就唤醒。

    private void unparkSuccessor(Node node) {
        //获取头节点waitStuatus
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //获取当前节点的下一个节点
        Node s = node.next;
        //如果如果下一个节点为空,或者下一个节点被cancelled,就找到队列最开始的非cancelled的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从队列尾部开始查找,找到第一个waitStuatus<0的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //如果当前节点的下一个节点不为空,而且状态<=0,就把当前节点unpark()
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码

从前往后找的原因:

  1. 断开节点是先断开的next指针
  2. 节点入队操作是纷纷为两步node.prev = pred; compareAndSetTail(pred, node),并不是原子操作,如果执行了node.prev = pred后执行了unparkSuccessor方法,就没法从前往后找了。

2.2.2 共享模式

共享模式,允许多个线程同时获取到同步状态,整个过程也是不响应中断的。

  • acuqiredShared()方法

共享模式下获取锁,获取成功就直接返回,否则加入等待队列中。

  1. 首先通过自定义的tryAcquireShared()方法尝试获取资源
  2. 获取失败执行doAcquireShared()方法,
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
复制代码
  • doAcquireShared()

同独占模式一样,将当前线程放入队列尾部,自旋直到获取到锁

    private void doAcquireShared(int arg) {
        //以共享模式加入到队列尾部
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //前驱节点为空,当前线程尝试获取资源
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //获取成功
                    if (r >= 0) {
                        //将head指向自己,还有有剩余资源可以唤醒自己
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt(); //被中断过,此时响应中断(也就是说并不是立即响应中断操作)
                        failed = false;
                        return;
                    }
                }
                //获取失败,将当前线程加入等待队列中,进入waiting状态等待被唤醒或打断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码
  • releaseShared()

共享模式下释放指定数量的资源,并唤醒后其他节点。

在独占模式下state=0时,才会唤醒其他线程,而共享模式下会直接唤醒其他线程。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            //唤醒后继节点
            doReleaseShared();
            return true;
        }
        return false;
    }
复制代码
  • doRealseShared()

用于唤醒其他节点,当state>0,获取剩余公共享资源;当state=0,获取共享资源

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

参考文章:

  1. tech.meituan.com/2019/12/05/…
  2. juejin.cn/post/684490…
  3. juejin.cn/post/684490…
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享