AbstractQueuedSynchronizer源码

AbstractQueuedSynchronizer通常简称AQS,他是java.util.locks包下的一个类,Java中的很多类都是通过改类实现的。

在本文中我们通过阅读源码的方式查看下其实现原理,使用的JDK版本为1.8.

1、类结构与数据结构

1.1 类继承关系

image-20210527153832062

AbstractQueuedSynchronizer继承AbstractOwnableSynchronizerAbstractOwnableSynchronizer的定义是比较简单的,里面只有一个字段exclusiveOwnerThread,该字段是用来存储当前占有同步器的的线程。

1.2 成员变量

AQS类中的成员变量如下:

  • Node head 头节点
  • Node tail 尾节点
  • int state 状态值 通过对该字段的操作实现同步的逻辑

AQS中有个ConditionObject内部类,其成员变量如下

  • Node fitstWaiter 头节点
  • Node lastWaiter 尾节点

1.3 数据结构

通过上面的成员变量,我们很容易想到这是一个链表结构,链表节点的定义如下:

static final class Node {
    // 共享锁模式的nextWaiter
    static final Node SHARED = new Node();
	// 独占锁模式的nextWaiter
    static final Node EXCLUSIVE = null;
	// 取消
    static final int CANCELLED =  1;
	
    static final int SIGNAL    = -1;
	
    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
复制代码

通过节点的定义我们发现里面双向链表和单向链表同时存在,这两个链表分别有什么用我们会在后面的源码学习中一点点的讲解。其结构如下图所示:

image-20210603192148366

节点的状态有如下几种,我们会在后面的源码查看中了解什么时候会设置成相应的值:

  • CANCELLED = 1
  • SIGNAL = -1
  • CONDITION = -2
  • PROPAGATE = -3
  • 0

2 获取/释放锁

在上面的部门我们了解了AQS的数据结构,在这部分中我们看下AQS是如何处理获取锁和释放锁的逻辑的。

AQS中存在独占和共享两种模式,对外提供的获取/释放的方法如下:

  • protected final boolean compareAndSetState(int expect, int update) 通过CAS的方式修改state属性
  • public final void acquire(int arg) 获取排他锁
  • public final void acquireShared(int arg) 获取共享锁
  • public final boolean tryAcquireNanos(int arg, long nanosTimeout) 带超时时间的获取排他锁
  • public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 带超时时间的获取共享锁
  • public final boolean release(int arg) 释放排他锁
  • public final boolean releaseShared(int arg) 释放共享锁

2.1 compareAndSetState方法

该方法是线程安全的修改state属性的值,其使用的是Unsafe类的CAS操作,其源码如下

protected final boolean compareAndSetState(int expect, int update) {
    // 使用Unsafe类进行CAS操作,保证线程安全
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码

2.2 acquire方法

该方法是排他锁获取锁的方法,其源码如下

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 获取锁失败且加入等待队列失败,执行Thread.interrupt()方法
        selfInterrupt();
}
复制代码

在这个方法中会调用tryAcquire方法、addWaiter方法和acquireQueued方法。

tryAcquire方法是尝试获得锁,阅读tryAcquire方法的源码我们会发现在AQS中直接抛出了异常,很明显这里使用了方法模板模式,需要子类来实现该方法,其源码如下:

protected boolean tryAcquire(int arg) {
    // 在AQS中直接抛出异常,留给子类实现   模板方法
    throw new UnsupportedOperationException();
}
复制代码

如果获取锁失败,AQS会将该线程添加到等待队列,addWaiter方法源码如下

private Node addWaiter(Node mode) {
    // 创建一个节点对象
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 队列非空
    if (pred != null) {
        // 添加到尾部
        node.prev = pred;
        // 使用CAS的方法修改尾节点
        if (compareAndSetTail(pred, node)) {
            // 修改当前节点未之前列表尾节点的下一个节点
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 自旋添加
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 头节点为空  初始化链表
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将链表的尾节点设置为新节点的前驱节点
            node.prev = t;
            // 通过cas的方式设置尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取节点的前置节点
            final Node p = node.predecessor();
            // 如果前置节点是头节点,尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 获得锁成功,将该节点设置为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前置节点的状态
    int ws = pred.waitStatus;
    // 如果为-1
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 从队列中移除该节点所有状态为CANCELLED的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 修改前置节点状态为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
	// 节点的线程设置为null
    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    // 从队列中移除状态为-1的前置节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    // 节点的状态修改为-1
    node.waitStatus = Node.CANCELLED;
	// 如果是尾节点,移除
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {

        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
复制代码

当不存在竞争时,线程过来就能获取到锁,此时的队列为空。

当一个线程过来,发现锁已经被其他线程占有,此时会初始化队列,并将自己插入队列中,队列结构如下

image-20210603171720317

当在有其他线程过来,会在参与一次锁竞争,如果获得失败,会添加到队列的尾部。

2.3 tryAcquireNanos

该方法是指定了超时时间的获取锁方法,该方法和tryAcquire方法没有太大的区别,只是增加了对超时时间的判断,该方法的源码如下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
        // 如果线程被中断过  抛出异常
        throw new InterruptedException();
    // 尝试获取锁 获取锁失败 将线程添加到阻塞队列
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    // 插入到阻塞队列并获取插入的节点
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                // 超时返回false
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

2.4 release方法

我们知道锁需要具备获取和释放的功能,上面的方法我们介绍了AQS中实现获取锁的逻辑,release是释放锁的逻辑,其源码如下

public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        // 头节点
        Node h = head;
        // 头节点存在且状态不为0
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
复制代码

通过源码我们可以发现,tryRelease方法同样需要子类来实现。

在获取锁那里也有unparkSuccessor方法,其源码如下:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        // 修改头节点的status为0
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 后置节点为空或者状态为1  被取消的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点遍历等待队列,移除被取消的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
复制代码

至此独占模式的锁获取和释放的逻辑已经介绍完了,共享模式相关的代码在本文中就不介绍了,大家可以自己去源码里查看下,逻辑相差不是很大。

3 ConditionObject

ConditionObjectAQS中的一个内部类,我们可以使用该类实现等待/唤醒的功能。类似wait()、notify()的功能,接下来我们看看在AQS中是如何实现的。

ConditionObject继承自Condition,其类图如下:

image-20210603192809380

这个类中提供了好多个等待/唤醒的方法,在本文中我们只介绍await()signal()两个方法,其他的几种方法还是大家自己去看源码理解。

3.1 await

该方法用于使当前线程进入等待状态,其源码如下:

public final void await() throws InterruptedException {
    // 判断当前线程是否被终止
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到等待队列
    Node node = addConditionWaiter();
    // 释放持有的锁 从阻塞队列中移除
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 不在同步队列
    while (!isOnSyncQueue(node)) {
        // 暂停线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
复制代码

将线程加入等待队列中的方法源码如下:

private Node addConditionWaiter() {
    // 单向链表的尾节点
    Node t = lastWaiter;
    
    // 如果状态不为-2 从等待队列中移除节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 移除不为等待状态节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建一个节点  节点状态为-2
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        // 链表未初始化时设置为头节点
        firstWaiter = node;
    else
        // 向链表尾部插入
        t.nextWaiter = node;
    // 设置为尾节点
    lastWaiter = node;
    return node;
}

private void unlinkCancelledWaiters() {
    // 单向链表的头节点
    Node t = firstWaiter;
    Node trail = null;
    // 遍历链表移除状态不为-2的节点
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
复制代码

当线程插入等待队列后会调用fullyRelease方法从阻塞队列中移除,其源码如下

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取当前状态值
        int savedState = getState();
        // 释放锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
复制代码

3.2 signal

该方法用于唤醒一个等待中的线程,其源码如下:

public final void signal() {
    // 当前线程是否持有锁的线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	// 替换节点状态
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
	// 添加到同步队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒线程
        LockSupport.unpark(node.thread);
    return true;
}
复制代码

4 总结

通过上面的源码阅读,我们会发现,在AQS中有两个链表,一个是双向链表的同步队列,一个是单向链表的等待队列。

链表节点的状态如下

  • 0 默认值
  • 1 取消 调用cancell方法时会设置成该值
  • -2 等待 调用Conditionawait方法时会设置成该值 在等待队列中
  • -1 调用signal方法或者acquireQueued方法时会设置成该值
  • -3 这个值在我们的源码中没有看到,这个值在释放共享锁的方法中会用到,大家自己去看这个逻辑吧

AQS的源码就到这里结束了,AQS是Java中的同步工具的底层实现,在它里面封装了队列的操作,接入方只需要实现相应的方法即可,我们在后面的文章中会介绍其在其他其他同步工具中的使用。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享