CyclicBarrier 是什么?
CyclicBarrier
一般称为栅栏
、屏障,是一种多线程同步工具。
常常和 CountDownLatch
一起作比较,因为都属于用于一组线程等待的工具。
不同于 CountDownLatch
常用于协调线程等待一组工作线程,且工作线程到达后做一次通知并继续执行,CyclicBarrier
人如其名的是一组线程相互等待全部到达指定处后,再全部继续执行。
CyclicBarrier
更高级的是:
-
允许重用,执行完成后或手动重置即可重新使用;
CountDownLatch
直接报废。 -
允许执行回调逻辑,一般是最后一个到达栅栏的线程自调用。我们可以用这个特性在业务逻辑中执行收尾工作。
-
出现问题可破坏当前分代(通知其他线程),重置进行下一次。不同于
CountDownLatch
达到指定条件处后就不关心结果,CyclicBarrier
必须等待在栅栏。如果某个工作线程在到达前出现异常,那就需要人为处理重置。当然出现异常、超时等情况也会自动破坏当前使用,但是注意并不能直接进行下一次使用,必须手动重置。
CyclicBarrier 怎么用?
CountDownLatch:几道锁的保险箱 (juejin.cn) cue 到 CyclicBarrier
说打开保险锁后,就不能再用了。
允许重用
亲信当然希望开锁拿钱跑路以后,老板回来检查就不会发现。
private static void normal() throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
if (!barrier.isBroken()) {
System.out.println("保险箱:安全保护中");
}
System.out.println("亲信们:不成功便成仁!!!!!");
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new MyFollower(barrier));
thread.start();
}
Thread.sleep(1000);
if (!barrier.isBroken()) {
System.out.println("保险箱:安全保护中");
System.out.println("老板:很好,这东西不错");
}
}
private static class MyFollower implements Runnable {
private CyclicBarrier barrier;
public MyFollower(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println("亲信:输入密码ing");
try {
barrier.await();
} catch (InterruptedException e) {
System.out.println("临死前:骂骂咧咧地退出了游戏");
} catch (BrokenBarrierException e) {
System.out.println("OS:傻子,这都输入错了");
}
System.out.println("亲信:成了!!!!!");
}
}
保险箱:安全保护中
亲信们:不成功便成仁!!!!!
亲信:输入密码ing
亲信:输入密码ing
亲信:输入密码ing
亲信:成了!!!!!
亲信:成了!!!!!
亲信:成了!!!!!
保险箱:安全保护中
老板:很好,这东西不错
复制代码
亲信被杀(中断)、输入错了(主动破坏)、输入慢了(超时)的情况下,关我保险锁什么事?
由于内部线程中断、或主动破坏都是校验一个布尔值,因此这种情况下其他线程无法知道什么原因导致破坏。
简单实现代码(真实情况肯定是需要更完善的协调和处理的)
private static void normal() throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
if (!barrier.isBroken()) {
System.out.println("保险箱:安全保护中");
}
System.out.println("亲信们:不成功便成仁!!!!!");
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new MyFollower(barrier, i + 1));
thread.start();
if (i == 0) {
thread.interrupt();
}
}
Thread.sleep(2000);
if (barrier.isBroken()) {
System.out.println("亲信:重置一下");
barrier.reset();
}
if (!barrier.isBroken()) {
System.out.println("保险箱:安全保护中");
System.out.println("老板:很好,这东西不错");
}
}
private static class MyFollower implements Runnable {
private CyclicBarrier barrier;
private int no;
public MyFollower(CyclicBarrier barrier, int no) {
this.barrier = barrier;
this.no = no;
}
@Override
public void run() {
System.out.println("亲信:输入密码ing");
if (no == 3) {
System.out.println("亲信:输错了...重置一下");
barrier.reset();
return;
}
if (no == 2) {
System.out.println("亲信:输慢了...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
try {
barrier.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("临死前:骂骂咧咧地退出了游戏");
return;
} catch (BrokenBarrierException e) {
System.out.println("OS:哪个傻子错了啊");
return;
} catch (TimeoutException e) {
System.out.println("亲信:是谁输入慢了");
return;
}
System.out.println("亲信:成了!!!!!");
}
复制代码
厂家自定义解锁结果(回调函数)
保险锁生产厂家允许消费者可以自定义解锁之后的庆祝、或者通知等等。怎么做呢?
private static void normal() throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("保险箱:恭喜主人,又拿到钱了!!!");
});
if (!barrier.isBroken()) {
System.out.println("保险箱:安全保护中");
}
System.out.println("亲信们:不成功便成仁!!!!!");
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new MyFollower(barrier));
thread.start();
}
Thread.sleep(1000);
if (!barrier.isBroken()) {
System.out.println("保险箱:安全保护中");
System.out.println("老板:很好,这东西不错");
}
}
保险箱:安全保护中
亲信们:不成功便成仁!!!!!
亲信:输入密码ing
亲信:输入密码ing
亲信:输入密码ing
亲信:成了!!!!!
亲信:成了!!!!!
亲信:成了!!!!!
保险箱:恭喜主人,又拿到钱了!!!
保险箱:安全保护中
老板:很好,这东西不错
复制代码
注意:回调函数做啥都行,取决于怎么用它。但是这是由最后一个到达栅栏的工作线程执行的。
CyclicBarrier 源码
CyclicBarrier
姑且也算 AQS 体系下吧,但是不像 CountDownLatch
内部直接继承扩展 AQS,而是使用了同样内部扩展 AQS 的 ReentrantLock
。
关于 AQS 和
ReenrtantLock
可以查看:
基于此,我们推测一下 CyclicBarrier
的内部结构:
- 要有锁,为了保证多线程的并发
- 要有条件,为了集体等待在栅栏处
- 要有计数器,计算是否全部到达
- 要有参与数量,用于重置计数器
- 要有标识破坏状态
- 要有分代的措施,保证重置后的计数器、状态影响不到上一次的。
- 要保存回调函数。
属性及内部类
private static class Generation {
Generation() {} // prevent access constructor creation
// 标识栅栏破坏,线程不会再等待
// 一般几种情况:
// 1.等待超时
// 2.线程中断
// 3.手动重置
// 4.回调执行异常
boolean broken; // initially false
}
// 用于控制对状态、数量操作的并发控制
private final ReentrantLock lock = new ReentrantLock();
// 线程等待在栅栏处的条件变量
private final Condition trip = lock.newCondition();
// 参与线程的数量
private final int parties;
// 完成时的回调动作,由最后一个到达栅栏的线程执行
private final Runnable barrierCommand;
// 允许重用下的分代
private Generation generation = new Generation();
// 剩下还没达到等待栅栏的线程数量, 每次到达就 --
// 初始值为 parties;
// parties - count = 正在等待的线程数量
private int count;
复制代码
对照推测:
- 要有锁
ReentrantLock lock
,为了保证多线程的并发 - 要有条件
Condition trip
,为了集体等待在栅栏处 - 要有计数器
count
,计算是否全部到达 - 要有参与数量
parties
,用于重置计数器 - 要有标识破坏状态
generation.broken
- 要有分代的措施
generation
,保证重置后的计数器影响不到上一次的。 - 要保存回调函数
barrierCommand
。
但是 Generation
的分代处理比较简便,不会留存上次分代的其他状态,直接重置;只保留破坏状态,用于限制还在上代执行过程中的线程可以退出。
所以重置和通知方法如下:
注意:避免混乱,一般内部调用这几个方法是一定需要只有锁
进入下一代 nextGeneration
能够成功进入下一个分代,说明是全部成功到达栅栏,并且回调函数执行成功了。
// 进入下一个分代
private void nextGeneration() {
// signal completion of last generation
// 唤醒上一个分代中的等待线程
trip.signalAll();
// 重置 count 计数
count = parties;
generation = new Generation();
}
复制代码
标志破坏
一般情况就是中断、超时、执行失败等。
private void breakBarrier() {
generation.broken = true;
count = parties;
// 唤醒当前分代中的等待线程
trip.signalAll();
}
复制代码
可以看出两次通知线程的操作一定伴随的是 count
的重置,不同的是两次分别操作了 generation
和 broken
。
说明线程等待栅栏时,会根据不同情况判断两个值。
构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
// 指定回调动作
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
复制代码
核心方法
await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
// 不响应超时,所以需要处理一下超时情况
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
复制代码
可以看出来,等待方法内部都是调用的 dowait
。从方法的异常列表上来看,是一个大而全的处理方法,可以跑出中断、破坏、超时异常啊。
dowait
/**
* @param timed 是否允许超时
* @param nanos 超时时间, 只有 timed = true 才有意义
* @return 线程到达栅栏的索引: 第一个:parties - 1; 最后一个:0
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 1.核心逻辑得加锁
lock.lock();
try {
final Generation g = generation;
if (g.broken)
// 2.如果已经破坏, 必须手动重置才能使用
throw new BrokenBarrierException();
if (Thread.interrupted()) {
// 3.提前检查当前执行线程已经中断了, 那收尾:破坏栅栏,顺便唤醒所有等待线程
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 4.全部到达了,当前线程是最后一个
boolean ranAction = false;
try {// 最后一个执行线程负责执行回调
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 全部到达,执行成功:进入下一代
nextGeneration();
return 0;
} finally {
if (!ranAction)
// 执行失败也是做收尾
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 5.循环等待,直到中断、破坏、超时的任意情况发生
for (;;) {
try {
// 判断是超时或不超时等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 6.判断是外部还是内部中断
if (g == generation && ! g.broken) {
// 6.1说明是外部其他线程中断的
// 进行收尾的同时, 得抛出异常
breakBarrier();
throw ie;
} else {
// 6.2如果进行到这里,那肯定是属于栅栏被破坏,或者全部到达栅栏
// 直接恢复中断状态,不需要处理异常
Thread.currentThread().interrupt();
}
}
if (g.broken)
// 7。检查是不是属于被破坏唤醒
throw new BrokenBarrierException();
if (g != generation)
// 8。是否结束了, 相当于返回序号
return index;
// 9.没结束,判断是否超时:超时收尾
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
// 我也不清除啥情况会继续循环.
// 如果会循环, 那中断状态会被延续到下一轮的第3步检查中断,从而抛出中断异常
}
} finally {
lock.unlock();
}
}
复制代码
注意到了吗,第一步是大前提获取锁,因为无论是计数器操作,还是nextGeneration
、breakBarrier
,或是进入阻塞状态,都是依靠锁作保证的
在第 6 步,在等待下可能会被中断,根据情况看是外部中断,还是内部导致的中断:nextGeneration
、breakBarrier
6.1如果是外部中断的话,则破坏栅栏,抛出异常。
6.2如果是内部中断的话,只会恢复中断状态,进行下一步。因为一般中断都是属于等待结束了,所以不会抛出异常。
6.2 后续进行超时、破坏、下一代(结束或重置)的判断
reset
重置会断掉当前代的等待,所以会调用breakBarrier
。
进入下一代:nextGeneration
。
如果说重置仅仅是为了下一代,其实可以直接调用后者;但是必须要中断掉当前代的等待呀。
所以breakBarrier
和nextGeneration
中两个唯一不同的操作结合起来:
generation.broken = true;
generation = new Generation();
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 唤醒等待线程, 标记已破坏
breakBarrier(); // break the current generation
// 进入下次分代
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
复制代码
统计方法
主要是为了判断是否破坏以及获取等待线程数。
结合 getParties
,也就能知道剩下未到达的线程数量了。
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
public int getParties() {
return parties;
}
复制代码
注意调用await
方法的线程数量与初始设置的parties
不一致
- 少于
parties
,无法使得count
减少至 0,最终会导致所有执行await
的线程都阻塞了。 - 多于
parties
,刚好parties
全部到达后,进入了下一代。而在下一代的线程数就是多于parties
的数量,导致这部分线程阻塞住。
建议,线程数量务必等于 parties
,尽量调用指定超时时间的 await
方法。
总结
关于 CountDownLatch
和 CyclicBarrier
的例子不是很好,但是勉强能表达出意思。
总结一下,CyclicBarrier
卷死CountDownLatch
的三大罪证:
-
允许重用,执行完成后或手动重置即可重新使用;
CountDownLatch
直接报废。 -
允许执行回调逻辑,一般是最后一个到达栅栏的线程自调用。我们可以用这个特性在业务逻辑中执行收尾工作。
-
出现问题可破坏当前分代(通知其他线程),重置进行下一次。不同于
CountDownLatch
达到指定条件处后就不关心结果,CyclicBarrier
必须等待在栅栏。如果某个工作线程在到达前出现异常,那就需要人为处理重置。当然出现异常、超时等情况也会自动破坏当前使用,但是注意并不能直接进行下一次使用,必须手动重置。