【抽奖啦】| 谈谈Semaphore工具类

本文已参与【请查收|你有一次免费申请掘金周边礼物的机会】活动。

前言

有幸申请到掘金送福利活动名额,参与评论就有机会获得掘金官方提供的新版徽章,具体的抽奖细节在文末。

Semaphore简介

Semaphore(信号量)是JUC包下的一个并发工具类,用来控制并发访问临界资源(共享资源)的线程数,确保访问临界资源的线程能够正确、合理的使用公共资源。Semaphore和ReetrantLock一样,都是通过直接或间接的调用AQS框架的方法实现。

Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。访问资源后,使用release释放许可。

Semaphore的实现

Semaphore类中所提供的方法:

// 调用该方法后线程会从许可集中尝试获取一个许可
public void acquire()

// 线程调用该方法时会释放已获取的许可
public void release()

// Semaphore构造方法:permits→许可集数量
Semaphore(int permits) 

// Semaphore构造方法:permits→许可集数量,fair→公平与非公平
Semaphore(int permits, boolean fair) 

// 从信号量中获取许可,该方法不响应中断
void acquireUninterruptibly() 

// 返回当前信号量中未被获取的许可数
int availablePermits() 

// 获取并返回当前信号量中立即未被获取的所有许可
int drainPermits() 

// 返回等待获取许可的所有线程Collection集合
protected  Collection<Thread> getQueuedThreads();

// 返回等待获取许可的线程估计数量
int getQueueLength() 

// 查询是否有线程正在等待获取当前信号量中的许可
boolean hasQueuedThreads() 

// 返回当前信号量的公平类型,如为公平锁返回true,非公平锁为false
boolean isFair() 

// 获取当前信号量中一个许可,当没有许可可用时直接返回false不阻塞线程
boolean tryAcquire() 

// 在给定时间内获取当前信号量中一个许可,超时还未获取成功则返回false
boolean tryAcquire(long timeout, TimeUnit unit) 
复制代码

Semaphore的实现是基于AQS的共享锁,分为公平和非公平两种模式。

Sync实现

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    //初始的许可数量,同步锁的state保存许可的数量
    Sync(int permits) {
        setState(permits);
    }
    
    //获取许可数量
    final int getPermits() {
        return getState();
    }

    //非公平的方式获取共享锁
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            //可用许可数减去需求的许可数
            int remaining = available - acquires;
            
            //许可数大于0时,CAS获取许可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    //释放许可
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            //许可数加锁释放的许可数
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //CAS更新许可,直到成功    
            if (compareAndSetState(current, next))
                return true;
        }
    }

    //减少许可
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            //    
            if (compareAndSetState(current, next))
                return;
        }
    }

    //将许可消耗完
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
复制代码

公平的FairSync实现

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }
    
    //公平方式获取许可
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            //当前节点有前驱节点,则获取失败
            if (hasQueuedPredecessors())
                return -1;
            //无前驱节点时,CAS方式获取许可    
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
复制代码

公平锁的实现与非公平锁的不同点在于:公平锁的模式下获取锁,会先调用hasQueuedPredecessors()方法判断同步队列中是否存在节点,如果存在则直接返回-1回到acquireSharedInterruptibly()方法if(tryAcquireShared(arg)<0)判断,调用doAcquireSharedInterruptibly(arg)方法将当前线程封装成Node.SHARED共享节点加入同步队列等待。如果队列中不存在节点则尝试直接获取锁或许可。

非公平的Sync实现

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }
    //直接使用Sync的nonfairTryAcquireShared()实现,非公平方式获取许可
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
复制代码

Semaphore中的非公平锁NonfairSync类的构造函数是基于调用父类Sync构造函数完成的,而在创建Semaphore对象时传入的许可数permits最终则会传递给AQS同步器的同步状态标识state,如下:

// 父类 - Sync类构造函数
Sync(int permits) {
    setState(permits); // 调用AQS内部的set方法
}

// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer {
    // 同步状态标识
    private volatile int state;
    
    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    // 对state变量进行CAS操作
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}
复制代码

Semaphore对象创建时传入的许可数permits,其实最终是在对AQS内部的state进行初始化。初始化完成后,state代表着当前信号量对象的可用许可数。

非公平锁NonfairSync获取许可

// Semaphore类 → acquire()方法
public void acquire() throws InterruptedException {
      // Sync类继承AQS,此处直接调用AQS内部的acquireSharedInterruptibly()方法
      sync.acquireSharedInterruptibly(1);
  }

// AbstractQueuedSynchronizer类 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 判断是否出现线程中断信号(标志)
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果tryAcquireShared(arg)执行结果不小于0,则线程获取同步状态成功
    if (tryAcquireShared(arg) < 0)
        // 未获取成功加入同步队列阻塞等待
        doAcquireSharedInterruptibly(arg);
}
复制代码

信号量获取许可的方法acquire()最终是通过Sync对象调用AQS内部的acquireSharedInterruptibly()方法完成的,而acquireSharedInterruptibly()在获取同步状态标识的过程中是可以响应线程中断操作的,如果该操作没有没中断,则首先调用tryAcquireShared(arg)尝试获取一个许可数,获取成功则返回执行业务,方法结束。如果获取失败,则调用doAcquireSharedInterruptibly(arg)将当前线程加入同步队列阻塞等待。tryAcquireShared(arg)方法是AQS提供的方法,没有具体实现,在NonfairSync类中的实现如下:

    // Semaphore类 → NofairSync内部类 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    // 调用了父类Sync中的实现方法
    return nonfairTryAcquireShared(acquires);
}

// Syn类 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    final int nonfairTryAcquireShared(int acquires) {
         // 开启自旋死循环
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             // 判断信号量中可用许可数是否已<0或者CAS执行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
}
复制代码

首先获取到state值后,减去一得到remaining值,若不小于0则代表着当前信号量中还有可用许可,当前线程开始尝试cas更新state值,cas成功则代表获取同步状态成功,返回remaining值。反之,如果remaining值小于0则代表着信号量中的许可数已被其他线程获取,目前不存在可用许可数,直接返回小于0的remaining值,nonfairTryAcquireShared(acquires)方法执行结束,回到AQS的acquireSharedInterruptibly()方法。当返回的remaining值小于0时,if(tryAcquireShared(arg)<0)条件成立,进入if执行doAcquireSharedInterruptibly(arg)方法将当前线程加入同步队列阻塞,等待其他线程释放同步状态。

// AbstractQueuedSynchronizer类 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 创建节点状态为Node.SHARED共享模式的节点并将其加入同步队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
     // 开启自旋操作
     for (;;) {
         final Node p = node.predecessor();
         // 判断前驱节点是否为head
         if (p == head) {
             // 尝试获取同步状态state
             int r = tryAcquireShared(arg);
             // 如果r不小于0说明获取同步状态成功
             if (r >= 0) {
                 // 将当前线程结点设置为头节点并唤醒后继节点线程             
                 setHeadAndPropagate(node, r);
                 p.next = null; // 置空方便GC
                 failed = false;
                 return;
             }
         }
       // 调整同步队列中node节点的状态并判断是否应该被挂起 
       // 并判断是否存在中断信号,如果需要中断直接抛出异常结束执行
         if (shouldParkAfterFailedAcquire(p, node) &&
             parkAndCheckInterrupt())
             throw new InterruptedException();
     }
    } finally {
     if (failed)
         // 结束该节点线程的请求
         cancelAcquire(node);
    }
}

// AbstractQueuedSynchronizer类 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 获取同步队列中原本的head头节点
    setHead(node); // 将传入的node节点设置为头节点
    /*
     * propagate=剩余可用许可数,h=旧的head节点
     * h==null,(h=head)==null:
     *      非空判断的标准写法,避免原本head以及新的头节点node为空
     * 如果当前信号量对象中剩余可用许可数大于0或者
     * 原本头节点h或者新的头节点node不是结束状态则唤醒后继节点线程
     * 
     * 写两个if的原因在于避免造成不必要的唤醒,因为很有可能唤醒了后续
     * 节点的线程之后,还没有线程释放许可/锁,从而导致再次陷入阻塞
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 避免传入的node为同步队列的唯一节点,
        // 因为队列中如果只存在node一个节点,那么后驱节点s必然为空
        if (s == null || s.isShared())
            doReleaseShared(); // 唤醒后继节点
    }
}
复制代码

释放许可

公平锁释放许可的逻辑与非公平锁的实现是一致的,因为都是Sync类的子类,而释放锁的逻辑都是对state减一更新后,唤醒后继节点的线程。所以关于释放锁的具体实现则是交由Sync类实现

// Semaphore类 → release()方法
public void release() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer类 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
    // 调用子类Semaphore中tryReleaseShared()方法实现
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码

释放锁则调用的是Semaphore.release()方法,调用该方法之后线程持有的许可会被释放,同时permits/state加一

与之前获取许可的方法一样,Semaphore释放许可的方法release()也是通过间接调用AQS内部的releaseShared(arg)完成。因为AQS的releaseShared(arg)是魔法方法,所以最终的逻辑实现由Semaphore的子类Sync完成,如下:

// Semaphore类 → Sync子类 → tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取AQS中当前同步状态state值
        int current = getState();
        // 对当前的state值进行增加操作
        int next = current + releases;
        // 不可能出现,除非传入的releases为负数
        if (next < current) 
            throw new Error("Maximum permit count exceeded");
        // CAS更新state值为增加之后的next值
        if (compareAndSetState(current, next))
            return true;
    }
}
复制代码

对比获取许可的逻辑要简单许多,只需更新state值后调用doReleaseShared()方法唤醒后继节点线程即可。但是调用doReleaseShared()方法的线程会存在两种:

一是释放共享锁/许可数的线程。调用release()方法释放许可时必然调用它唤醒后继线程

二是刚获取到共享锁/许可数的线程。

总结

在初始化时传递的许可数/计数器最终都会间接的传递给AQS的同步状态标识state。当一条线程尝试获取共享锁时,会对state减一,当state为0时代表没有可用共享锁了,其他后续请求的线程会被封装成共享节点加入同步队列等待,直至其他持有共享锁的线程释放(state加一)。与独占模式不同的是:共享模式中,除开释放锁时会唤醒后继节点的线程外,获取共享锁成功的线程也会在满足一定条件下唤醒后继节点。关于AQS具体可参见之前的文章:java锁:AQS详解(一)
java锁:AQS详解(二)

抽奖说明

1.参与评论即可(讨论内容可以是技术相关,也可以是生活相关,也可以吐槽掘金抽奖);

2.抽奖规则:如果本文评论达到掘金活动的要求,热评区前两名分别获赠掘金新版徽章一枚(若无热评将从评论区抽取两位幸运用户);

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享