前言
CyclicBarrier适用场景
- 需要多个线程都到达某一点,或某一阶段,再继续
- 相当于多个线程到齐了,才能继续
样例
public class CyclicBarrierT {
public static String getTime(){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
//字符串转时间String dateTimeStr = "2018-07-28 14:11:15";
return LocalDateTime.now(ZoneOffset.of("+8")).format(formatter);
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
Runnable runnable = ()->{
try {
String threadName =Thread.currentThread().getName();//当前线程的名字
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 到达Barrier");
cyclicBarrier.await();//屏障
System.err.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 通过Barrier");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 4; i++) {//启动多个线程
new Thread(runnable).start();
}
}
}
复制代码
另外一些样例
分批通过屏障
public class CyclicBarrierT {
public static String getTime(){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
//字符串转时间String dateTimeStr = "2018-07-28 14:11:15";
return LocalDateTime.now(ZoneOffset.of("+8")).format(formatter);
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
Runnable runnable = ()->{
try {
String threadName =Thread.currentThread().getName();//当前线程的名字
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 到达Barrier");
cyclicBarrier.await();//屏障
System.err.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 通过Barrier");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
for (int j = 0; j < 3; j++) {
for (int i = 0; i < 4; i++) {//启动多个线程
new Thread(runnable).start();
}
}
}
}
复制代码
无限等待的情况
public class CyclicBarrierT {
public static String getTime(){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
//字符串转时间String dateTimeStr = "2018-07-28 14:11:15";
return LocalDateTime.now(ZoneOffset.of("+8")).format(formatter);
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
Runnable runnable = ()->{
try {
String threadName =Thread.currentThread().getName();//当前线程的名字
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 到达Barrier");
cyclicBarrier.await();//屏障
System.err.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 通过Barrier");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 3; i++) {//启动多个线程
new Thread(runnable).start();
}
}
}
复制代码
CyclicBarrier(int parties, Runnable barrierAction)
- 创建一个新的CyclicBarrier ,它将在给定数量的参与方(线程)等待时触发,并在屏障触发时执行给定的屏障操作,由进入屏障的最后一个线程执行。
参数:
parties – 在障碍被触发之前必须调用await的线程数
barrierAction – 障碍物被触发时执行的命令,如果没有动作则为null
- 样例
public class CyclicBarrierT {
public static String getTime(){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
//字符串转时间String dateTimeStr = "2018-07-28 14:11:15";
return LocalDateTime.now(ZoneOffset.of("+8")).format(formatter);
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4,()->
System.out.println("特定数量的线程数到达屏障,该输出语句由 "+Thread.currentThread().getName()+" 执行"));
Runnable runnable = ()->{
try {
String threadName =Thread.currentThread().getName();//当前线程的名字
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 到达Barrier");
cyclicBarrier.await();//屏障
System.err.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 通过Barrier");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 4; i++) {//启动多个线程
new Thread(runnable).start();
}
}
}
复制代码
int await(long timeout, TimeUnit unit)
要和CountDownLatch中的区分开来
public class CyclicBarrierT {
public static String getTime(){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
//字符串转时间String dateTimeStr = "2018-07-28 14:11:15";
return LocalDateTime.now(ZoneOffset.of("+8")).format(formatter);
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4,()->
System.out.println("特定数量的线程数到达屏障,该输出语句由 "+Thread.currentThread().getName()+" 执行"));
Runnable runnable = ()->{
try {
String threadName =Thread.currentThread().getName();//当前线程的名字
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 到达Barrier");
cyclicBarrier.await(20, TimeUnit.MILLISECONDS);//屏障
System.err.println("线程:"+ threadName +" "+ CyclicBarrierT.getTime()+" 通过Barrier");
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 4; i++) {//启动多个线程
new Thread(runnable).start();
}
}
}
复制代码
为什么该方法要直接抛异常且不执行屏障后面的语句?
- CyclicBarrier想让每个线程无主次之分,它们可能会相互配合的完成某项任务,一个线程出错就可能导致其他线程也出错
- 线程在规定时间内没有完成任务(到达屏障),可能是出了问题,出于慎重考虑就直接抛异常
- 抛出超时异常后屏障被破环,导致其他线程抛出屏障破坏异常,屏障后面的也不执行了
- 而CountDownLatch线程有主次线程之分,主线程在等待子线程,主线程的重要程度较大,如果子线程在规定时间没有完成,就先执行主线程,毕竟已经分主次了。
CyclicBarrier理解其大致原理
观察构造方法源码
观察await()
方法
- 等待,直到所有各方都在此屏障上调用了await 。
- 如果当前线程不是最后一个到达的线程,那么它会出于线程调度目的而被禁用并处于休眠状态,直到发生以下情况之一:
- 最后一个线程到达; 或者
- 其他一些线程中断当前线程; 或者
- 其他一些线程中断了其他等待线程之一; 或者
- 其他一些线程在等待屏障时超时; 或者
- 其他一些线程在此屏障上调用reset 。
- 如果当前线程:
- 在进入此方法时设置其为中断状态; 或者
- 等待时被中断
然后抛出InterruptedException并清除当前线程的中断状态。
- 如果在任何线程正在等待时reset屏障,或者在调用await时屏障被破坏,或者在任何线程正在等待时,则抛出
BrokenBarrierException
。 - 如果任何线程在等待时被中断,那么所有其他等待线程将抛出BrokenBarrierException并且屏障被置于破坏状态。
- 如果当前线程是最后一个到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。 如果在屏障操作期间发生异常,则该异常将在当前线程中传播,并且屏障被置于破坏状态。
返回值:
当前线程的到达索引,其中 index getParties() – 1表示第一个到达,零表示最后一个到达
throws:
InterruptedException
– 如果当前线程在等待时被中断
BrokenBarrierException
– 如果当前线程正在等待时另一个线程被中断或超时,或者屏障被重置,或者在调用await时屏障被破坏,或者屏障操作(如果存在)由于异常而失败
await()
方法的大致实现
await方法是通过调用dowait方法实现的
dowait()
方法之线程通过屏障的策略
dowait()
方法之线程在屏障前等待的策略
小结——关于cyclicBarrier的底层执行流程
- 初始化
cyclicBarrier
中的各种成员变量,包括parties
、count
以及Runnable
(可选) - 当调用
await
方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable
,接下来开始下一个generation
; - 在下一个分代(
Generation
)中,将会重置count
值为parties
,并且创建新的generation
实例。 - 同时会调用
condition
的signalAll
方法,唤醒所有在屏障前面等待的线程,让其开始继续执行。 - 如果计数器没有归零,那么当前的调用线程将会通过
condition
的await
方法,在屏障前进行等待。 - 以上所有执行流程均在
lock
锁的控制范围内,不会出现并发情况。 - 里面的
Generation
就是实现Barrier
复用的关键,本文章前面的代码样例:分批通过屏障
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END