本文已参与【请查收|你有一次免费申请掘金周边礼物的机会】活动。
前言
有幸申请到掘金送福利活动名额,参与评论就有机会获得掘金官方提供的新版徽章,具体的抽奖细节在文末。
Semaphore简介
Semaphore(信号量)是JUC包下的一个并发工具类,用来控制并发访问临界资源(共享资源)的线程数,确保访问临界资源的线程能够正确、合理的使用公共资源。Semaphore和ReetrantLock一样,都是通过直接或间接的调用AQS框架的方法实现。
Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。访问资源后,使用release释放许可。
Semaphore的实现
Semaphore类中所提供的方法:
// 调用该方法后线程会从许可集中尝试获取一个许可
public void acquire()
// 线程调用该方法时会释放已获取的许可
public void release()
// Semaphore构造方法:permits→许可集数量
Semaphore(int permits)
// Semaphore构造方法:permits→许可集数量,fair→公平与非公平
Semaphore(int permits, boolean fair)
// 从信号量中获取许可,该方法不响应中断
void acquireUninterruptibly()
// 返回当前信号量中未被获取的许可数
int availablePermits()
// 获取并返回当前信号量中立即未被获取的所有许可
int drainPermits()
// 返回等待获取许可的所有线程Collection集合
protected Collection<Thread> getQueuedThreads();
// 返回等待获取许可的线程估计数量
int getQueueLength()
// 查询是否有线程正在等待获取当前信号量中的许可
boolean hasQueuedThreads()
// 返回当前信号量的公平类型,如为公平锁返回true,非公平锁为false
boolean isFair()
// 获取当前信号量中一个许可,当没有许可可用时直接返回false不阻塞线程
boolean tryAcquire()
// 在给定时间内获取当前信号量中一个许可,超时还未获取成功则返回false
boolean tryAcquire(long timeout, TimeUnit unit)
复制代码
Semaphore的实现是基于AQS的共享锁,分为公平和非公平两种模式。
Sync实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//初始的许可数量,同步锁的state保存许可的数量
Sync(int permits) {
setState(permits);
}
//获取许可数量
final int getPermits() {
return getState();
}
//非公平的方式获取共享锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
//可用许可数减去需求的许可数
int remaining = available - acquires;
//许可数大于0时,CAS获取许可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//释放许可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//许可数加锁释放的许可数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS更新许可,直到成功
if (compareAndSetState(current, next))
return true;
}
}
//减少许可
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//
if (compareAndSetState(current, next))
return;
}
}
//将许可消耗完
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
复制代码
公平的FairSync实现
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//公平方式获取许可
protected int tryAcquireShared(int acquires) {
for (;;) {
//当前节点有前驱节点,则获取失败
if (hasQueuedPredecessors())
return -1;
//无前驱节点时,CAS方式获取许可
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
复制代码
公平锁的实现与非公平锁的不同点在于:公平锁的模式下获取锁,会先调用hasQueuedPredecessors()方法判断同步队列中是否存在节点,如果存在则直接返回-1回到acquireSharedInterruptibly()方法if(tryAcquireShared(arg)<0)判断,调用doAcquireSharedInterruptibly(arg)方法将当前线程封装成Node.SHARED共享节点加入同步队列等待。如果队列中不存在节点则尝试直接获取锁或许可。
非公平的Sync实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//直接使用Sync的nonfairTryAcquireShared()实现,非公平方式获取许可
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
复制代码
Semaphore中的非公平锁NonfairSync类的构造函数是基于调用父类Sync构造函数完成的,而在创建Semaphore对象时传入的许可数permits最终则会传递给AQS同步器的同步状态标识state,如下:
// 父类 - Sync类构造函数
Sync(int permits) {
setState(permits); // 调用AQS内部的set方法
}
// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步状态标识
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 对state变量进行CAS操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
复制代码
Semaphore对象创建时传入的许可数permits,其实最终是在对AQS内部的state进行初始化。初始化完成后,state代表着当前信号量对象的可用许可数。
非公平锁NonfairSync获取许可
// Semaphore类 → acquire()方法
public void acquire() throws InterruptedException {
// Sync类继承AQS,此处直接调用AQS内部的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer类 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判断是否出现线程中断信号(标志)
if (Thread.interrupted())
throw new InterruptedException();
// 如果tryAcquireShared(arg)执行结果不小于0,则线程获取同步状态成功
if (tryAcquireShared(arg) < 0)
// 未获取成功加入同步队列阻塞等待
doAcquireSharedInterruptibly(arg);
}
复制代码
信号量获取许可的方法acquire()最终是通过Sync对象调用AQS内部的acquireSharedInterruptibly()方法完成的,而acquireSharedInterruptibly()在获取同步状态标识的过程中是可以响应线程中断操作的,如果该操作没有没中断,则首先调用tryAcquireShared(arg)尝试获取一个许可数,获取成功则返回执行业务,方法结束。如果获取失败,则调用doAcquireSharedInterruptibly(arg)将当前线程加入同步队列阻塞等待。tryAcquireShared(arg)方法是AQS提供的方法,没有具体实现,在NonfairSync类中的实现如下:
// Semaphore类 → NofairSync内部类 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
// 调用了父类Sync中的实现方法
return nonfairTryAcquireShared(acquires);
}
// Syn类 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 开启自旋死循环
for (;;) {
int available = getState();
int remaining = available - acquires;
// 判断信号量中可用许可数是否已<0或者CAS执行是否成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
复制代码
首先获取到state值后,减去一得到remaining值,若不小于0则代表着当前信号量中还有可用许可,当前线程开始尝试cas更新state值,cas成功则代表获取同步状态成功,返回remaining值。反之,如果remaining值小于0则代表着信号量中的许可数已被其他线程获取,目前不存在可用许可数,直接返回小于0的remaining值,nonfairTryAcquireShared(acquires)方法执行结束,回到AQS的acquireSharedInterruptibly()方法。当返回的remaining值小于0时,if(tryAcquireShared(arg)<0)条件成立,进入if执行doAcquireSharedInterruptibly(arg)方法将当前线程加入同步队列阻塞,等待其他线程释放同步状态。
// AbstractQueuedSynchronizer类 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建节点状态为Node.SHARED共享模式的节点并将其加入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 开启自旋操作
for (;;) {
final Node p = node.predecessor();
// 判断前驱节点是否为head
if (p == head) {
// 尝试获取同步状态state
int r = tryAcquireShared(arg);
// 如果r不小于0说明获取同步状态成功
if (r >= 0) {
// 将当前线程结点设置为头节点并唤醒后继节点线程
setHeadAndPropagate(node, r);
p.next = null; // 置空方便GC
failed = false;
return;
}
}
// 调整同步队列中node节点的状态并判断是否应该被挂起
// 并判断是否存在中断信号,如果需要中断直接抛出异常结束执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 结束该节点线程的请求
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer类 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 获取同步队列中原本的head头节点
setHead(node); // 将传入的node节点设置为头节点
/*
* propagate=剩余可用许可数,h=旧的head节点
* h==null,(h=head)==null:
* 非空判断的标准写法,避免原本head以及新的头节点node为空
* 如果当前信号量对象中剩余可用许可数大于0或者
* 原本头节点h或者新的头节点node不是结束状态则唤醒后继节点线程
*
* 写两个if的原因在于避免造成不必要的唤醒,因为很有可能唤醒了后续
* 节点的线程之后,还没有线程释放许可/锁,从而导致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免传入的node为同步队列的唯一节点,
// 因为队列中如果只存在node一个节点,那么后驱节点s必然为空
if (s == null || s.isShared())
doReleaseShared(); // 唤醒后继节点
}
}
复制代码
释放许可
公平锁释放许可的逻辑与非公平锁的实现是一致的,因为都是Sync类的子类,而释放锁的逻辑都是对state减一更新后,唤醒后继节点的线程。所以关于释放锁的具体实现则是交由Sync类实现
// Semaphore类 → release()方法
public void release() {
sync.releaseShared(1);
}
// AbstractQueuedSynchronizer类 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
// 调用子类Semaphore中tryReleaseShared()方法实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
释放锁则调用的是Semaphore.release()方法,调用该方法之后线程持有的许可会被释放,同时permits/state加一
与之前获取许可的方法一样,Semaphore释放许可的方法release()也是通过间接调用AQS内部的releaseShared(arg)完成。因为AQS的releaseShared(arg)是魔法方法,所以最终的逻辑实现由Semaphore的子类Sync完成,如下:
// Semaphore类 → Sync子类 → tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取AQS中当前同步状态state值
int current = getState();
// 对当前的state值进行增加操作
int next = current + releases;
// 不可能出现,除非传入的releases为负数
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS更新state值为增加之后的next值
if (compareAndSetState(current, next))
return true;
}
}
复制代码
对比获取许可的逻辑要简单许多,只需更新state值后调用doReleaseShared()方法唤醒后继节点线程即可。但是调用doReleaseShared()方法的线程会存在两种:
一是释放共享锁/许可数的线程。调用release()方法释放许可时必然调用它唤醒后继线程
二是刚获取到共享锁/许可数的线程。
总结
在初始化时传递的许可数/计数器最终都会间接的传递给AQS的同步状态标识state。当一条线程尝试获取共享锁时,会对state减一,当state为0时代表没有可用共享锁了,其他后续请求的线程会被封装成共享节点加入同步队列等待,直至其他持有共享锁的线程释放(state加一)。与独占模式不同的是:共享模式中,除开释放锁时会唤醒后继节点的线程外,获取共享锁成功的线程也会在满足一定条件下唤醒后继节点。关于AQS具体可参见之前的文章:java锁:AQS详解(一)
java锁:AQS详解(二)
抽奖说明
1.参与评论即可(讨论内容可以是技术相关,也可以是生活相关,也可以吐槽掘金抽奖);
2.抽奖规则:如果本文评论达到掘金活动的要求,热评区前两名分别获赠掘金新版徽章一枚(若无热评将从评论区抽取两位幸运用户);