JUC并发编程(5):JUC三大常用辅助类–CountDownLatch、CyclicBarrier、Semaphore 解决并发问题

JUC三大常用辅助类–CountDownLatch、CyclicBarrier、Semaphore 解决并发问题

为什么这里要介绍下JUC强大的工具类? CountDownLatch | CyclicBarrier | Semaphore 底层都是AQS来实现的

1、减少计数CountDownLatch

1.1、概述

CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句 。

countDownLatch具有的功能

  • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会等待阻塞。
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。

构造器:

CountDownLatch(int count)  //构造一个用给定计数初始化CountDownLatch
复制代码

方法:

image.png

两个常用的主要方法

await() //使当前线程在锁存器倒计数至零之前一直在等待,除非线程被中断。

countDown() //递减锁存器的计数,如果计数达到零,将释放所有等待的线程。
复制代码

1.2、案例演示

6个同学陆续离开教室之后,班长才能锁门,如果不加 CountDownLatch 类,会出现线程混乱执行,同学还未离开完教室,班长就已经锁门了,如下:

普通实现

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //6个同学陆续离开教室之后,启动6个线程。
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");

            },String.valueOf(i)).start();
        }

        System.out.println(Thread.currentThread().getName()+" 班长锁门走人了");
    }
}
复制代码

执行结果

1 号同学离开了教室
5 号同学离开了教室
4 号同学离开了教室
2 号同学离开了教室
main 班长锁门走人了
3 号同学离开了教室
6 号同学离开了教室
复制代码

上面这种情况下,不知道哪个线程会先执行,只要抢占到 CPU 就可以执行了。

使用CountDownLatch后具体正确的案例代码

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        
        //创建CountDownLatch对象,设置初始值
        CountDownLatch countDownLatch = new CountDownLatch(6);
        
        //6个同学陆续离开教室之后
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");
                //计数 -1,调用countDown方法的线程不会阻塞
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }

        //等待,计数器的数从6变为0的过程中,主线程会一直阻塞等待在这,直到变为0才会执行
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName()+" 班长锁门走人了");
    }
}
复制代码

总结:

  • 每个同学出去一次,则执行countDown()方法进行减一操作,调用countDown方法的子线程不会阻塞
  • 主线程不会执行,因为主线程调用了await()方法,在计数器的数从6变为0的过程中,主线程会一直阻塞等待在这,直到变为0
  • 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。main主线程必须要等前面6个线程完成全部工作后,自己才能开干。

1.3、应用场景

来自zhuanlan.zhihu.com/p/148231820…

典型的应用场景

如并行计算,当某个处理的运算量很大时,可以将该运算任务拆分成多个子任务,等待所有的子任务都完成之后,父任务所在线程再拿到所有子任务的运算结果进行汇总。

先来看看 CountDownLatch 的源码注释;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class CountDownLatch {
}
复制代码

描述如下:它是一个同步工具类,允许一个或多个线程一直等待,直到其他线程运行完成后再执行。

通过描述,可以清晰的看出,CountDownLatch的两种使用场景:

  • 场景1:让多个线程等待
  • 场景2:和让单个线程等待。

场景1: 让多个线程等待:模拟并发,让并发线程一起执行

为了模拟高并发,让一组线程在指定时刻(秒杀时间)执行抢购,这些线程在准备就绪后,进行等待(CountDownLatch.await()),直到秒杀时刻的到来,然后一拥而上;这也是本地测试接口并发的一个简易实现。

在这个场景中,CountDownLatch充当的是一个发令枪的角色;就像田径赛跑时,运动员会在起跑线做准备动作,等到发令枪一声响,运动员就会奋力奔跑。和上面的秒杀场景类似,代码实现如下:

CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        try {
            //准备完毕……运动员都阻塞在这,等待号令
            countDownLatch.await();
            String parter = "【" + Thread.currentThread().getName() + "】";
            System.out.println(parter + "开始执行……");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

Thread.sleep(2000);// 裁判准备发令
countDownLatch.countDown();// 发令枪:执行发令
复制代码

运行结果:

【Thread-0】开始执行……
【Thread-1】开始执行……
【Thread-4】开始执行……
【Thread-3】开始执行……
【Thread-2】开始执行……
复制代码

我们通过CountDownLatch.await(),让多个参与者线程启动后阻塞等待,然后在主线程调用CountDownLatch.countdown(1) 将计数减为0,让所有线程一起往下执行;以此实现了多个线程在同一时刻并发执行,来模拟并发请求的目的。

场景2:让单个线程等待:多个线程(任务)完成后,进行汇总合并

这也是我们的案例演示

很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check;
这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。

代码实现如下:

CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
    final int index = i;
    new Thread(() -> {
        try {
            Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));
            System.out.println("finish" + index + Thread.currentThread().getName());
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

countDownLatch.await();// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
复制代码

运行结果:

finish4Thread-4
finish1Thread-1
finish2Thread-2
finish3Thread-3
finish0Thread-0
主线程:在所有任务运行完成后,进行结果汇总
复制代码

在每个线程(任务) 完成的最后一行加上CountDownLatch.countDown(),让计数器-1;当所有线程完成-1,计数器减到0后,主线程往下执行汇总任务。

从上面两个例子的代码,可以看出 CountDownLatch 的API并不多;

  • CountDownLatch的构造函数中的count就是闭锁需要等待的线程数量。这个值只能被设置一次,而且不能重新设置;
  • await():调用该方法的线程会被阻塞,直到构造方法传入的 N 减到 0 的时候,才能继续往下执行;
  • countDown():使 CountDownLatch 计数值 减 1;

image.png

实验CountDownLatch去解决时间等待问题

public class AtomicIntegerDemo {
    
    AtomicInteger atomicInteger=new AtomicInteger(0);
    
    public void addPlusPlus(){
        atomicInteger.incrementAndGet();
    }
    public static void main(String[] args) throws InterruptedException {
        
        CountDownLatch countDownLatch=new CountDownLatch(10);
        AtomicIntegerDemo atomic=new AtomicIntegerDemo();
        
        // 10个线程进行循环100次调用addPlusPlus的操作,最终结果是10*100=1000
        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
               try{
                   for (int j = 1; j <= 100; j++) {
                       atomic.addPlusPlus();
                   }
               }finally {
                   countDownLatch.countDown();
               }
            },String.valueOf(i)).start();
        }
        //(1). 如果不加上下面的停顿3秒的时间,会导致还没有进行i++ 1000次main线程就已经结束了
        //try { TimeUnit.SECONDS.sleep(3);  } catch (InterruptedException e) {e.printStackTrace();}
        //(2). 使用CountDownLatch去解决等待时间的问题
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName()+"\t"+"获取到的result:"+atomic.atomicInteger.get());
    }
}
复制代码

1.4、底层原理

image.png

2、循环栅栏CyclicBarrier

2.1、概述

该类是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点,在设计一组固定大小的线程的程序中,这些线程必须互相等待,这个类很有用,因为该barrier在释放等待的线程后可以重用,所以称为循环barrier

CyclicBarrier的字面意思是可循环(Cyclic) 使用的屏障(barrier)。它要做的事情是让一组线程到达一个屏障(也可以叫做同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法

常用的构造方法有:

CyclicBarrier(int parties,Runnable barrierAction)
//创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程操作。
复制代码

方法:

image.png

常用的方法有:

await()  //在所有的参与者都已经在此barrier栅栏上调用await方法之前一直等待。
复制代码

2.2、案例演示

集齐7颗龙珠才可以召唤神龙

public class CyclicBarrierDemo {

    //创建固定值
    private static final int NUMBER = 7;

    public static void main(String[] args) {
        
        //创建CyclicBarrier
        CyclicBarrier cyclicBarrier =
            new CyclicBarrier(NUMBER,()->{
                System.out.println("*****集齐7颗龙珠就可以召唤神龙");
            });

        //集齐七颗龙珠过程
        for (int i = 1; i <=7; i++) {
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+" 星龙被收集到了");
                    //每个线程都会等待阻塞在栅栏,直到7个线程都执行该await方法之后,
                    //栅栏里面的操作才会进行,且由最后一个进入栅栏的线程执行
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}
复制代码

执行结果

1 星龙被收集到了
6 星龙被收集到了
4 星龙被收集到了
5 星龙被收集到了
2 星龙被收集到了
7 星龙被收集到了
3 星龙被收集到了
***********集齐7颗龙珠就可以召唤神龙
复制代码

总结:

CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句。可以将 CyclicBarrier 理解为加 1 操作。

2.3、底层原理

image.png

3、信号灯Semaphore

3.1、概述

一个计数信号量,从概念上将,信号量维护了一个许可集,如有必要,在许可可用前会阻塞每个线程,acquire()获取该许可,每个release()添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动。

具体常用的构造方法有:

Semaphore(int permits)  //创建具有给定的许可数和非公平的公平设置的Semapore。
复制代码

具体常用的方法有:

acquire()  //从此信号量获取一个许可,在提供一个许可给其他线程前,其他线程阻塞,否则线程被中断 
release() //释放一个许可,将其返回给信号量。
复制代码

设置许可数量Semaphore semaphore = new Semaphore(3); ,一般 acquire()都会抛出异常,releasefinally中执行

  • acquire(获取)当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量或超时。
  • release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
  • 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

3.2、案例演示

6辆汽车,停3个车位

public class SemaphoreDemo {
    public static void main(String[] args) {
        
        //创建Semaphore,设置许可数量
        Semaphore semaphore = new Semaphore(3);

        //模拟6辆汽车
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                try {
                    //抢占,获取一个许可,其他线程被阻塞,相当于锁
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+" 抢到了车位");
                    //设置随机停车时间
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(Thread.currentThread().getName()+" ------离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //释放许可 ,无论停车时间多久,
                    //最终必须释放许可返回给Semaphore信号量,其他线程才能用,类似于线程池。
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}
复制代码

执行结果:

1 抢到了车位
3 抢到了车位
2 抢到了车位
3 ------离开了车位
4 抢到了车位
1 ------离开了车位
5 抢到了车位
5 ------离开了车位
6 抢到了车位
2 ------离开了车位
6 ------离开了车位
4 ------离开了车位
复制代码

3.3、底层原理

image.png

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享