这是我参与8月更文挑战的第4天,活动详情查看: 8月更文挑战
为什么使用线程池?
- 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。
- 线程本身也是要占用内存空间的,大量的线程会抢占宝贵的内存资源,如果处理不当,可能会导致Out of Memory异常。即便没有,大量的线程回收也会给GC带来很大的压力,延长GC的停顿时间。
因此,在生产环境中使用线程,必须对其加以控制和管理。
什么是线程池?
为了避免系统频繁的创建和销毁线程,我们可以让创建的线程进行复用。
池化技术也很常见,比如数据库连接池,为了避免每次数据库操作都要重新建立和销毁数据库连接,我们可以使用数据库连接池维护一些数据库连接。当系统需要使用时,并不是创建一个新的连接,而是从连接池中获得一个可用的连接即可。反之,当需要关闭连接时,并不是真的将连接关闭,而是将这个连接添加到连接池中即可。
线程池也是一样,线程池中,总有几个活跃线程,当你需要使用线程时,可以从池子中拿一个空闲线程,当完成工作是,将这个线程退回线程池中,以便其他人使用。
JDK对线程池的支持
在 Java 中,新建一个线程池对象非常简单,Java 本身提供了工具类java.util.concurrent.Executors
。
Executors
提供了各种类型的线程池,主要有以下方法:
// 该方法返回一个固定线程数量的线程池
// 新任务提交时,此线程池中如果有空闲线程,则立即执行,否则将任务添加到任务队列(LinkedBlockingQueue)中
public static ExecutorService newFixedThreadPool(int nThreads)
// 该方法返回一个只有一个线程的线程池
// 若多余一个线程提交到线程池,将任务添加到任务队列(LinkedBlockingQueue)中
public static ExecutorService newSingleThreadExecutor()
// 该方法返回一个动态调整的线程池,线程池中线程数量不固定,若有空闲线程则复用,
// 否则创建一个新的线程执行任务
public static ExecutorService newCachedThreadPool()
// 该方法返回一个ScheduledExecutorService对象,线程池大小为1
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
// 该方法返回一个ScheduledExecutorService对象,可以指定线程池大小
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// 工作窃取线程池 since 1.8
// 改方法返回一个可以窃取任务的线程池,假设A、B两个线程,如果A已经处理完毕自己的任务
// 它会主动窃取B的任务执行
public static ExecutorService newWorkStealingPool()
复制代码
虽然JDK中提供了丰富的工厂方法,但是在生产场景中,按需指定线程池的参数创建线程池,对于规避资源耗尽的风险更有帮助。所以在阿里开发规范中强调了,不允许使用Executors
创建线程池,而是通过ThreadPoolExecutor
的方式。
线程池核心实现
java线程池的核心类是ThreadPoolExecutor,下图为其UML类图。
ThreadPoolExecutor
实现的顶层接口是Executor
,顶层接口Executor
提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable
对象,将任务的运行逻辑提交到执行器(Executor
)中,由Executor
框架完成线程的调配和任务的执行部分。ExecutorService
接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future
的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService
则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor
实现最复杂的运行部分,ThreadPoolExecutor
将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
参数含义:
- corePoolSize:指定了线程池中的线程数量。
- maximumPoolSize:指定了线程池中的 最大线程数量。
- keepAliveTime:当线程池线程数量超过corePoolSize是,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内,会被销毁。
- unit:keepAliveTime的单位。
- workQueue:任务队列,被提交但尚未被执行的任务。
- threadFactory:线程工厂,用于创建线程。
- handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。
线程池的创建包含很多个参数,功能十分强大。但是,在创建时需要十分小心,几个参数的选择尤为重要,会直接影响性能。还有一点值得注意的是,ThreadPoolExecutor为这七个参数都提供了get、set方法已支持动态修改配置和获取参数信息提供监控机制,以此可以 实现动态线程配置管理平台。
线程池任务调度逻辑
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
- 直接申请线程执行该任务;
- 缓冲到队列中等待线程执行;
- 拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
线程池的生命周期
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
复制代码
ctl
这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
//计算当前运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//计算当前线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//通过状态和线程数生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
ThreadPoolExecutor的运行状态有5种,分别为:
运行状态 | 状态描述 |
---|---|
RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务 |
SHUTDOWN | 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 |
STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程 |
TIDYING | 所有的任务都已终止了,workerCount(有效线程)为0 |
TERMINATED | 在terminated()方法执行完后进入该状态 |
其生命周期转换如下图所示:
源码探索
以下内容可能会引起您的不适,请做好心理准备。
任务调度:execute(Runnable command)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
复制代码
所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
任务申请:getTask()
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 这里的关键值timed,如果allowCoreThreadTimeOut=true或者此时工作线程大于corePoolSize,
* timed=true,如果timed=true,会调用poll()方法从阻塞队列中获取任务,否则调用take()方法
* 获取任务。
*
* poll(long timeout, TimeUnit unit):从BlockingQueue取出一个任务,如果不能立即取出,
* 则可以等待timeout参数的时间,如果超过这个时间还不能取出任务,则返回null;
* take():从blocking阻塞队列取出一个任务,如果BlockingQueue为空,阻断进入等待状态直到 * BlockingQueue有新的任务被加入为止。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
复制代码
内部类:Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
复制代码
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
2.如果正在执行任务,则不应该中断线程。
3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
Worker线程增加:addWorker()
private boolean addWorker(Runnable firstTask, boolean core)
复制代码
firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
// 外层循环,负责判断线程池的状态
// retry后面跟循环,标记这个循环的位置。我们可以在continue或者break后面加retry,表示要跳到这个循环,其中break表示要跳过这个标记的循环,continue表示从这个标记的循环继续执行。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内层循环,负责worker数量+1
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker数量+1成功的后续操作
// 添加到workers Set集合,并启动worker线程
...
复制代码
Worker线程执行:runWorker()
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
- while循环不断地通过getTask()方法获取任务。
- getTask()方法从阻塞队列中取任务。
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
- 执行任务。
- 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// ...略
task.run();
// ...略
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 销毁线程
processWorkerExit(w, completedAbruptly);
}
复制代码