上一篇文章讲了有关线程池的一些简单的用法,这篇文章主要是从源码的角度进一步带大家了解线程池的工作流程和工作原理。
首先先来回顾下如何使用线程池开启线程
private static void createThreadByThreadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 10; i++) {
MyThread myThread = new MyThread();
executor.execute(myThread);
}
复制代码
可以看到其实没有其它特殊的地方,除了构建线程池的代码,其它最终要的就是executor.execute(myThread)
行代码了。
准备工作
在多线程系列的第一篇文章中提到了线程和进程的状态,线程池同样也有状态,如下:
- Running: 允许接受新的任务,并且处理队列中的任务
- Shutdown: 不接受新的任务,但是仍然会处理队列中的任务
- Stop: 不接受新的任务,不处理队列中的任务,而且中端在运行的任务
- Tidying: 所有的任务都已经终端,并且工作线程数归0,该状态下的线程都会调用
terminated()
函数 - Terminated: 当
terminated()
函数调用完后就进入了此状态
首先需要知道4个概念,Worker
、workers
、workQueue
和 task
.
-
在线程池中有个比较重要的类,那就是
Worker
,可以看到其实现了Runnable
接口(其实就是工作线程,继承了AbstractQueuedSynchronizer
类(俗称AQS
,在多线程中是很重要的类) -
workers
就是Worker
的一个集合,private final HashSet<Worker> workers = new HashSet<Worker>();
-
task
:需要执行的任务,也就是execute()
中的参数,实现了Runnable
接口 -
workQueue
就是工作队列,就是上一篇文章中线程池构造函数中的工作队列,里面存储的就是需要执行的任务,队列是实现BlockingQueue
接口的类,有以下这些实现
execute()
本方法传进去的类是需要实现Runnable接口的,作为一个command
传进去
遇到新的任务后
- 如果工作线程数 < 核心线程数,那么直接加1个worker
- 如果线程池是正常的工作状态,并且工作队列能够添加任务,此时需要第二轮判断
- 如果线程池因为某种原因不正常了,并且能够成功从工作队列中删除任务,那么直接采取拒绝策略
- 如果此时工作线程数为0,此时需要新建一个线程(并且这里创建的是非核心线程)来执行这个任务,为什么是null呢,因为已经把任务放在工作队列里面了。如果新建的worker的firstTask是该任务的话,就会重复执行两次任务。
- 其它情况也就是正常情况下,是啥都不干,因为主要目的是把任务放到工作队列中
- 如果工作队列已经满了,则需要判断是否能够成功添加一个非核心线程,如果连非核心线程数都满足不了了,就是线程池真的搞不了了,累了,所以也会调用拒绝策略。
注意:核心线程和非核心线程只是语义上的说法,没有本质上的区别
addworker()
addworker
的作用是检查是否可以根据当前池状态和给定界限(核心或最大值)添加新线程 ,并且通过第二个参数来判定是否创建核心线程,当为true的时候就是核心线程,反之就是非核心线程。源码里的注释如下
来看看代码具体是如何的
- 一进来就是一个死循环,这个死循环最主要的目的是确认线程池状态是否正常。如果线程池的状态大于SHUTDOWN,也就是处于STOP、TIDYING或者TERMINATED的时候,线程池都没了,还创建worker干啥,直接返回fasle;当线程池处于SHUTDOWN的时候,又得再次判断:
- 传进来的任务如果不为空,那么就不用添加worker的,银行下班了,办理完现在的顾客就不在办理了,不然一直来那不是一直不能下班。
- 传进来的任务为空,但是工作队列为空,同样不用添加worker,银行都没有任务了,而且又到点下班了。除了以上3中情况返回false,其它情况不做任何反应,往下走。
- 又是一个死循环,首先得到工作线程数如果超过了边界,比如超过了容量、核心线程数或者最大线程数,就不用添加worker了,银行实在是办理不了新的顾客了;当工作线程数正常的情况下,通过CAS来增加工作线程数,如果能增加成功就退出最外层循环。如果增加工作线程失败,那就是其它线程增加了该数量,如果此时线程池的运行状态发生了改变,则重复外层循环,否则就自旋直到成功增加工作线程数。
- 到这里就可以说基本扫除了障碍,以传进来的firstTask新建一个Worker,然后获取Worker里的线程。如果线程为null,那么就直接执行添加Worker失败的逻辑,否则就是正常的逻辑。
- 先来看正常的逻辑,拿到锁,并且开始又一次的获取线程池的状态
- 如果线程是正常运行状态,或者说是关闭状态下firstTask仍然为null,此时如果线程是
alive
那么而说明线程已经开启,直接抛出异常。 - 正常情况下是wokers集合中添加新的worker元素,并且调整线程池最大值,设置workerAdded标志为true。
- 重点 ,当workerAdded为true的时候,开启工作线程,也就是代码中的
t.start()
- 如果线程是正常运行状态,或者说是关闭状态下firstTask仍然为null,此时如果线程是
- 再来看失败的逻辑 ,是在第3点构造一个Worker对象,获取线程的时候,有可能获取到的线程为空,这样其实就是增加worker失败,返回false,在返回false之前会触发执行失败的逻辑
addWorkerFailed()
逻辑。整个的逻辑是比较简单的,就不再花费篇幅去阐述。
到这里,整个addWorker()
的流程是比较清晰的,值得一提的就是第2行代码中的retry
这个看起来是关键字,但其实不是,仅仅只是一个类似标志位的东西,可以是retry,也可以是abc。
通常是配合for循环来使用,搭配continue和break可以达到goto的效果。比如continue retry;
就是跳到一开始最外层的for循环,break retry;
相当于退出整个循环。写一个最简单的例子大家都能知道了。
public class RetryExample {
public static void main(String[] args) {
abc:
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
if (j == 2) {
continue abc;
}
if (i == 8) {
break abc;
}
System.out.println("i: " + i + ", j: " + j);
}
}
}
}
复制代码
输出结果如下图所示
创作不易,如果对你有帮助,欢迎点赞,收藏和分享啦!