JUC CountDownLatch 从使用到源码解析

CountDownLatch 从使用到源码解析

建议:本篇文章结合JUC AbstractQueuedSynchronizer(AQS)源码-加解锁分析效果更佳!

应用场景

CountDownLatch是一个比较有用的线程并发工具类,他的应用场景有很多,一般都是用在监听某些初始化操作上,等待初始化操作完成后然后通知某些主线程继续执行;在生活中例如接力赛吧,一个队员必须要接到另一个队员的接力棒才能跑;再例如我玩游戏的时候必须要先加载一些基础数据,基础数据加载完成之后才能开始游戏。这样的例子有很多;CountDownLatch的应用场景也有很多;在之前我们做电子合同的时候有个场景是要把用户上传的合同文档(一般都是.doc或者.docx),我们需要把这个.doc或者.docx文档转换成pdf;原因就是电子签章必须要签署到pdf上;一个文档页数有很多,我们就开启几个线程分页的去转换,我们必须要到等到所有线程都转换完成才能继续下一步的操作,就是这个场景我们就用到了CountDownLatch这个并发工具类,

原理分析

它的内部提供一个计数器,在构造方法中必须要指定计数器的初始值,且计数器的初始值必须要大于0

CountDownLatch countDownLatch = new CountDownLatch(2);

另外它提供了一个countDown()方法来操作计数器的值,每调用一次countDown()方法计数器的值就会减1,直到计数器的值减为0,代表初始化操作已经完成,所有调用await方法而阻塞的线程都会被唤醒;可以进行下一步的操作!

实例

public class UseCountDownLatch {
  
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread t1 = new Thread(() -> {
            try {
                System.out.println("进入线程t1" + "等待其他线程处理完成...");
                countDownLatch.await();
                System.out.println("t1线程继续执行...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            try {
                System.out.println("t2线程进行初始化操作...");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("t2线程初始化完毕,通知t1线程继续");
                countDownLatch.countDown(); //类似通知
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2");
        Thread t3 = new Thread(() -> {
            try {
                System.out.println("t3线程进行初始化操作...");
                TimeUnit.SECONDS.sleep(4);
                System.out.println("t3线程初始化完毕,通知t1线程继续");
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t3");
        t1.start();
        t2.start();
        t3.start();
    }
}

复制代码

在初始化CountDownLatch时构造方法中传入了数值2,t1会等待t2和t3都调用countDownLatch.countDown();之后继续执行;执行结果如下:

进入线程t1等待其他线程处理完成...
t2线程进行初始化操作...
t3线程进行初始化操作...
t2线程初始化完毕,通知t1线程继续
t3线程初始化完毕,通知t1线程继续
t1线程继续执行...
复制代码

API介绍

  • await(): 调用该方法的线程必须等到构造方法传入的值减到0的时候才能继续往下执行;
  • await(long timeout,TimeUnit unit): 与上面的await方法功能一致,只不过这里有了时间限制,调用了该方法的线程等到指定的timeout时间后,不管构造方法传入的值是否减为0,都会继续往下执行;
  • countDown(): 使CountDownLatch初始值(构造方法中传入的值)减1;
  • long getCount(): 获取当前CountDownLatch维护的值;

源码分析

下面贴一下CountDownLatch的构造方法:

   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
复制代码

CountDownLatch只有一个带参的构造方法,从这里可以看出count必须要大于0,不然会报错!在构造方法中只是new 了一个Sync对象;并赋值给了成员变量sync; 了解AQS的同学可以知道,CountDownLatch的实现依赖于AQS,它是AQS共享模式下的一个应用;

image-20210507172847358.png

关于AQS我们后面有篇幅单独去讲解这个;上图可以看到不光CountDownLatch依赖于AQS,像ReentrantLock,ReentrantReadWriteLock,Semaphore都是依赖于AQS;可以看到AQS的重要性;同时也是面试的重点;

回归原题:CountDownLatch实现了一个内部类Sync并用它去继承AQS,这样就能使用AQS提供的大部分模板方法了;

我们看一下Sync内部类的代码:

public class CountDownLatch {
    //Sync继承了AQS
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
		//Sync构造方法
        Sync(int count) {
            setState(count);
        }
		//获取当前同步状态
        int getCount() {
            return getState();
        }
	   //获取锁方法
        protected int tryAcquireShared(int acquires) {
            //在构造方法中state这个值必须要传入大于0的值,所以这里一直都是获取锁成功的;
            //直到每调用一次countDown()方法将state减1;
            return (getState() == 0) ? 1 : -1;
        }
		//释放锁方法
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取锁状态
                int c = getState();
                //如果等于0,就不能再释放锁了
                if (c == 0)
                    return false;
                //否则将同步器减1
                int nextc = c-1;
                //使用CAS更新锁状态
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

复制代码

image.png

在获取锁方法tryAcquireShared()方法中:返回的是一个int类型的数值,分别为-1,0,1;他们分别表示:

  • 负值:表示获取锁失败
  • 零值:表示当前节点获取成功,但是后继节点不能再获取了
  • 正值:表示当前节点获取成功,并且后继节点同样可以获取

CountDownLatch获取锁和释放锁的过程比较简单,我们在使用CountDownLatch的时候会调用await()方法来加锁,countDown()方法来解锁;下面我们先来看一下调用awit()方法的流程:

public void await() throws InterruptedException {
     //以响应线程中断方式获取   
     sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        //1 判断线程是否中断
        if (Thread.interrupted())
            //2以抛出异常的方式响应线程中断
            throw new InterruptedException();
        //3 尝试获取锁;在上面对正数,负数,0,3种取值已经进行了说明
        if (tryAcquireShared(arg) < 0)
            //4 获取锁失败
            doAcquireSharedInterruptibly(arg);
    }
复制代码

关于是否响应线程中断后面文章会有介绍,感兴趣的可以关注我后面会有更新;

这里我们先关注3,4处的代码,首先调用了tryAcquireShared(arg)方法进行获取锁;这个方法就是上面我们贴出来的tryAcquireShared(int acquires)方法,看state是否等于0.如果等于0就返回1;表示加锁成功,否则返回-1表示不能获取锁。如果此方法返回1线程不必等待继续向下执行,如果此方法返回-1则进行 doAcquireSharedInterruptibly(arg)方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

JUC AbstractQueuedSynchronizer(AQS)源码-加解锁分析这篇文章中我们讲述doAcquireShared()这个方法:

image-20210524111021488

基本上大致是一样的,唯一的区别就是在是否响应中断上有些区别;由于代码基本是一样的,这里就不过多的诉述了,建议看一下上一篇文章;

下面大致的说一下countDown()方法的过程:首先我们在CountDownLatch的构造方法中传入了一个数值count,这个数值赋给你内部类Sync,在Sync的构造方法中将count设置给了State同步状态,当每次调用countDown()方法的时候就会调用内部类Sync的sync.releaseShared(1);方法然后调用tryReleaseShared()方法实现自己释放锁的逻辑将State的值减1,每调用一次countDown()方法state就会减1,直到state减为0

public void countDown() {
     sync.releaseShared(1);
}

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

复制代码

关于调用countDown()方法用一个生活中的例子解释更恰当;记得小时候家里的门都是使用门闩(shuan栓)去锁门;如果不知道门闩是什么,我在bai度搜索了图片:

image-20210523140427228

在保证更安全更牢固的情况下,可能一个门上会有多个门闩;当每调用一次countDown()就相当于打开一个门闩;直到每个门闩都会打开,这个门才能打开;

我们大致对countDown()方法有了了解,我们再去看源码,在源码中调用了tryReleaseShared(arg)方法去释放锁,tryReleaseShared方法如果返回true表示释放成功,返回false表示释放失败,只有当将同步状态减1后该同步状态恰好为0时才会返回true,其他情况都是返回false。

		//释放锁方法
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取锁状态
                int c = getState();
                //如果等于0,就不能再释放锁了
                if (c == 0)
                    return false;
                //否则将同步器减1
                int nextc = c-1;
                //使用CAS更新锁状态
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
复制代码

这里假设在构造CountDownLatch时count传入的是2,执行到这里getState();等于2,然后执行c-1,最后进行CAS将c-1的结果赋值给state,这时return的肯定是false;然后整个releaseShared()方法返回false;也就是说必须执行2次countDown()方法才会返回true;当下一次执行countDown()完毕之后,结果返回true,随后继续执行doReleaseShared();方法去唤醒同步队列的所有线程!

doReleaseShared();调用的是AQS类中的方法,在JUC AbstractQueuedSynchronizer(AQS)源码-加解锁分析这篇文章中我们已经讲述过,这个不在阐述了;

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享