为什么用线程池
回答这个问题之前可以想下线程的最简单的生命周期:线程的创建->线程的执行任务(包含等待)->线程的销毁。设线程的创建时间为t1,线程的执行任务时间为t2,线程的销毁时间为t3。线程总生命周期消耗时间为t=t1+t2+t3。你想要提高一个服务的吞吐量,肯定是让线程真正执行任务的时间占的越多越好,那么最简单的方式就是把t1、t3去掉,在开始的时候创建后面一直使用创建的线程,这样就不用用完销毁之后又来任务再创建,同时尽量减少t2中的等待时间,让cpu少来回切换。
线程创建的开销: 给线程分配栈空间(jvm -Xss可以进行配置)、列入调度,同时在线程切换的时候还要执行内存换页,cpu的缓存被清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性。
线程创建的销毁: 回收栈空间、列出调度、cpu缓存被清空。
什么是线程池
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性。
线程池的优点
- 提高线程的可管理性:线程时稀缺资源、如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 能保证内核被充分利用,还能防止过度调度。
- 减少了创建与销毁线程的代价——时间、空间。
下面图为jdk的ThreadPoolExecutor的运行机制图:
下面我们通过阅读jdk的源码来进一步了解线程池的具体实现。
ThreadPoolExecutor基本结构了解
我们从上至下简单的描述下:
- Executor:该接口只有一个execute方法,接收参数为Runnable,给子类提供方一个规范,只需要把任务提交过来,具体内部任务运行的机制包括是使用调用线程执行还是使用线程池、任务如何调度、线程池中的线程如何管理都按照子类的方式进行实现。
- ExecutorService:也是一个接口继承了Executor,未使用的ExecutorService应该关闭,以便回收其资源,主要提供管理终止的方法,以及可以产生Future跟踪一个或多个异步任务进度的方法。
- AbstractExecutorService:一个抽象类,提供ExecutorService执行方法的默认实现。此类使用newTaskFor返回RunnableFuture实现submit、invokeAny和invokeAll方法,该函数默认为包中提供的FutureTask类。
- ThreadPoolExecutor:Executor的一个实现类,使用线程池执行每个任务,该类提供了许多可调整的参数和可扩展性的hooks。
ThreadPoolExecutor的类结构属于单线继承实现结构还是比较简单的,用于设计的学习比较好。现在结构中的各个类/接口都已经有一定的了解,我们可以先看看描述中提及了FutureTask再继续对ThreadPollExecutor的了解。
FutureTask
- Future:一个接口,代表异步计算的结果。提供了检查计算是否完成、等待其完成以及检索计算结果的方法。取消方法。
- RunnableFuture:一个接口,一个可以运行的Future。run方法的方法成功会导致Futrure的完成,并允许访问其结果。
- Futureask:RunnableFuture的实现类,可取消的异步计算。
get
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//一直阻塞直到任务完成
s = awaitDone(false, 0L);
//任务完成了才允许返回返回值
return report(s);
}
复制代码
可以看到,上面的关键在等state变成完成及之后,那么肯定是在run方法内改变的,下面我们来看看
run
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//FutureTask内部持有callable
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
复制代码
可以看到run方法内部调用了Callable的call方法。最后如果执行成功了就把结果进行set。另外一点可以看到这里是重写Runnable的run方法,所以这样很巧妙的让后面的ThreadPoolExecutor的execute传参统一为Runnable不管subnmit提交的是Callable还是Runnable——这个是重点。
ThreadPoolExecutor的字段
// 线程池状态和线程数量做或运算得到的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 32-3,用低29位表示线程池内有效的线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2^29-1为最大容量,和COUNT_BITS对应用Integer的低29位来存储线程数,那么最大容量就是2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 使用Integer的高三位表示线程池的运行状态
// RUNNING的二进制存储为1110 0000 0000 0000 0000 0000 0000 0000
// 能接受新提交的任务,并且也能处理阻塞队列中的任务。
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN的二进制存储为0000 0000 0000 0000 0000 0000 0000 0000
// 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不能接受新的任务,也不处理队列的任务,会中断正在处理任务的线程。
private static final int STOP = 1 << COUNT_BITS;
// 所有的任务都已终止,workerCount(有效线程数)为0。
private static final int TIDYING = 2 << COUNT_BITS;
// 线程池彻底关闭
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//根据c获得运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//根据c获得线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//给出运行状态获得ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 尝试自旋转递增ctl的workerCount部分
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 尝试自旋转递减ctl的workerCount部分
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 递减ctl的workerCount字段。这仅在线程突然终止时调用(请参阅processWorkerExit)。其他递减在getTask中执行。
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* 用于保存任务并将其传递给工作线程的队列。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 锁定对workers集合的访问和相关统计。虽然可以使用某种并发集合,但事实证明,通常最好使用锁。
* 其中一个原因是一系列中断的工作线程,从而避免不必要的突然中断,尤其是在shutdown期间。否则,退出的线程将同时中断那些尚未中断的线程。
* 它还简化了一些相关的统计,如最大池大小等。
* 我们还在shutdown和shutdownNow持有mainLock锁,以确保wokers集合稳定,同时分别检查是否允许中断和实际中断。
* 简而言之:不用并发集合的原因在于存在临界区使用单独的锁比较方便。
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set中包含所有池中的工作线程。只有持有mainLock的时候才能访问。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 支持awaitTermination的等待条件。
*/
private final Condition termination = mainLock.newCondition();
/**
* 跟踪最大的池大小。只有持有mainLock的时候才能访问。
*/
private int largestPoolSize;
/**
* 完成任务的计数器。仅在工作线程终止时更新。只有持有mainLock的时候才能访问。
*/
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
/**
* 在饱和或者关闭时的拒绝策略。
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲线程等待工作的超时时间(纳秒)。
* 当存在超过corePoolSize或allowCoreThreadTimeOut时,线程使用此超时。否则他们会永远等待新的工作。
*/
private volatile long keepAliveTime;
/**
* 如果为false(默认),则即使在空闲时,核心线程仍保持活动状态。
* 如果为true,则核心线程使用keepAliveTime超时等待工作。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
*Core pool size是保持活动状态(不允许超时等)的最小工作线程数,除非设置了allowCoreThreadTimeOut,在这种情况下,最小值为零。
*/
private volatile int corePoolSize;
/**
* 最大池大小。请注意,实际最大值在内部由CAPACITY限制。
*/
private volatile int maximumPoolSize;
/**
* 默认的拒绝策略
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* shutdown和Shutdownow的呼叫者需要权限
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* 执行finalizer时要使用的上下文 */
private final AccessControlContext acc;
复制代码
上面所有的字段已经在注释中说明了,我们这里说下主要的字段。
ctl
线程池状态和线程数量做或运算得到的,因为状态是固定的值所以可以做到。可以看到AutomicInteger使用自旋+volatile保证增加、减少是线程安全的。很巧妙的使用了一个值表示两个信息,高3位用来表示线程池的运行状态,低29(5亿多足够了)为保存工作线程的数量。好处在于在二者都需要修改的时候不用加锁直接使用AutomicInteger的CAS进行赋值。
线程池运行状态
- RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务。
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
- STOP:不能接受新的任务,也不处理队列的任务,会中断正在处理任务的线程。
- TIDYING:所有的任务都已终止,workerCount(有效线程数)为0。
- TERMINATED:线程池彻底关闭。
mainLock
锁定对wokers集合的访问和相关统计。
handler-拒绝策略
默认的为AbortPolicy。
- ThreadPoolExecutor.AbortPolicy:默认的也是常用的拒绝策略(因为一般配置的值是服务器能承受的并发量),当任务不能再提交时,直接抛出RejectedExecutionException异常表示拒绝。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程(就是准备提交异步任务的线程)处理。这样能保证任务都执行完成,但是如果是接口的话接口响应速度会变慢。
- ThreadPoolExecutor.DiscardPolicy:抛弃任务,但是不抛出异常。一般业务不用。
- ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最前面的任务,然后重新提交被拒绝的任务。一般不使用,因为需要对任务进行分类,能抛出的再抛并且一般异步任务没有可以抛弃的。
submit
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
复制代码
提交方法,这里看不出啥,不过可以看到和前面对Executor的描述“将任务提交和任务允许分离”是对应的。这里的submit只负责任务的封装,后面的运行包括任务的调度、任务的缓冲、任务的申请、任务的拒绝等等都在execute。execute在ThreadPollExecutor中实现带回在看,下面我们看下里面的newTaskFor方法。
newTaskFor-新建任务
/**
* 根据被给的runnable和默认值,返回RunnableFuture。
*
* @param runnable 正在包装的runnable 任务
* @param value 返回的默认值
* @param <T> value的类型
* @return RunnableFuture, 运行时讲运行底层的runnable,Future将产生给定的值作为结果,并提供底层任务的取消。
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
复制代码
可以看到该方法同样返回FutureTask因为给出了value,但是其实Runnable不能返回方法,所以value将原路返回。
/**
* 构造函数传Runnable作为执行函数,result是固定值,因为Runnable不像Callable一样能返回结果,但是固定值的返回会需要等待Runnable执行完成。
*/
public FutureTask(Runnable runnable, V result) {
//这里把runnable装饰成callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
复制代码
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
复制代码
到这一步就已经很清楚了,RunnableAdapter把Runnable装饰成了Callable,call方法返回的是传过来的value。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
复制代码
可以看到s,至于Callable的相关的submit这里就不赘述。
execute
在将来的某个时候执行给定的任务。任务可以在新线程或现有池线程中执行。如果由于此执行器已关闭或已达到其容量而无法提交任务执行,则该任务将由当前的RejectedExecutionHandler处理。
任务的执行外层分为三步:
- 如果运行的线程少于corePoolSize,请尝试以给定命令作为其第一个任务启动新线程。对addWorker的调用以原子方式检查运行状态和workerCount,通过返回false从而来防止错误报警-在不应该添加线程的情况下添加线程。
- 如果一个任务可以成功排队,那么我们仍然需要再次检查是否应该添加一个线程(因为自上次检查以来,已有线程已死亡),或者自进入此方法后,池是否关闭。因此,我们重新检查状态,如果停止,必要时回滚排队,如果没有,则启动一个新线程。
- 如果无法对任务排队,则尝试添加新线程。如果失败了,我们知道我们已经被关闭或饱和,所以拒绝这个任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//先查看当前工作线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//小于的话就给当前任务添加一个新的线程执行
if (addWorker(command, true))
return;
//添加失败了,需要重新获取,因为创建线程开销比较大
c = ctl.get();
}
//如果已经超过核心线程数,那么就判断当前线程池是否在运行中,并且可以往阻塞队列里面进行添加任务-等待线程释放才会执行。
if (isRunning(c) && workQueue.offer(command)) {
//重新获得ctl因为添加任务进队列很可能运行状态已经改变了
int recheck = ctl.get();
//如果当前不在运行中那么就回滚队列,走拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果在运行中但是工作线程为0了所以需要添加非核心线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果不能正常的排队那么就尝试添加非核心线程,如果添加失败久走拒绝策略
else if (!addWorker(command, false))
reject(command);
}
复制代码
源码的注释步骤描述的比较清楚,建议还是直接看源码步骤,有些文章的图不太精确。大致的流程就是:
- 看核心线程是否超过,没有就直接添加核心线程直接执行任务。
- 如果超过了就排队,排队超过了添加非核心线程直接执行任务。
- 如果前面步骤都失败了或者中间状态检查失败就直接拒绝。
任务执行的触发是在添加新的线程时,所以addWorker涉及到添加新的线程和执行任务,下面我们来具体看看addWorker方法。
addWorker-添加工作线程
检查当前线程池的状态和给定的边界(核心数或者最大线程数)是否能添加新的工作线程。如果是这样,则会相应地调整工作线程数量,如果可能的话,会创建并启动一个新的工作线程,将firstTask作为其第一个任务运行。如果线程池池已停止或符合关闭条件,则此方法返回false。如果线程工厂在被请求时未能创建线程,它也会返回false。如果线程创建失败,或者是由于线程工厂返回null,或者是由于异常(通常是thread.start()中的OutOfMemoryError),我们会完全回滚。
private boolean addWorker(Runnable firstTask, boolean core) {
//看是否能增加工作线程,并递增workcount
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要的时候检查队列是否为空
//如果大于等于SHUTDOWN说明已经不接收任务了,并且不是SHUTDOWN状态了队列也不为空就直接返回false——如果rs处于SHUTDOWN的状态并且还有任务没有执行完成可以添加工作工作线程但是不能携带非空的firstTask
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//看工作线程是否大于等于容量(注意这里是大于等于,可以防止多线程的情况下超,这样只能少量的超出),或者如果没超出就判断是否大于核心线程或者规定的最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加工作线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新读取ctl
if (runStateOf(c) != rs)
continue retry;
// CAS失败因为工作线程数已经改变,集训继续内循环
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//内部会创建新的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//拿着锁重新检查。在ThreadFactory出现故障或在获得锁之前关闭时退出。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果threadFactory创建线程失败尝试回滚并尝试terminate
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
上面主要描述了添加新的woker如果有新的任务就直接执行。下面我们看看任务是怎么执行的。至此我们已经了解了如何添加任务、如何增加工作线程,下面我们继续了解任务是如何让分配执行的。
runWorker-执行任务
public void run() {
runWorker(this);
}
复制代码
主要的woker运行循环。重复地从队列中获取任务并执行它们,同时处理许多问题:
- 我们可以从最初的任务开始,在这种情况下,我们不需要得到第一个任务。否则,只要线程池在运行,我们就可以从getTask获取任务。如果返回null,则工作进程将由于线程池状态或配置参数的更改而退出。其他退出源于外部代码中的异常抛出,在这种情况下completedjustly保持不变,这通常会导致processWorkerExit替换该线程。
- 在运行任何任务之前,获取锁以防止任务执行时其他池中断,然后我们确保除非池stopping,否则该线程没有中断设置。
- 每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,我们会导致线程在不处理任务的情况下死亡(使用CompletedTruntly true中断循环)。
- 假设beforeExecute正常完成,我们运行任务,收集其抛出的任何异常以发送给afterExecute。我们分别处理RuntimeException、Error(这两个异常都保证我们可以捕获)和任意Throwables。因为我们不可以在Runnable.run中重新抛出Throwables,我们在退出时将它们包装在错误中(到线程的UncaughtExceptionHandler)。任何异常会导致线程死亡。
- 在task.run完成后,我们调用afterExecute,它也可能引发异常,这也会导致线程死亡。根据JLS Sec 14.20,即使task.run抛出异常,这个异常也会生效。异常机制的最终效果是,afterExecute和线程的UncaughtExceptionHandler具有尽可能准确的信息,我们可以提供有关用户代码遇到的任何问题的信息。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断-只有运行相同woker才会中断
//是否是突然完成——正常完成(firstTask都已经完成了并且队列中也完成了)了会置为false,否则就是突然完成
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
//要开始执行了需要
w.lock();
// 如果池正在停止,确保线程被中断;
// 如果没有,确保线程没有中断。这需要在第二种情况下进行重新检查,以在清除中断的同时处理Shutdownow race。——Thread.interrupted()比较耗时可能会导致线程池状态变更。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//此类中为空,是一个可扩展的hook
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//此类中为空,是一个可扩展的hook
afterExecute(task, thrown);
}
} finally {
task = null;
//完成的任务递增,不管又没有发生异常
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
这里可以看到runWorker会一直使用当前线程一直执行任务,直到队列里面也没有任务或者出现异常。
我们先看看getTask方法,看看processWorkerExit方法,在没有任务执行或者出现异常的情况下后续的操作。
getTask-获得任务(keepAliveTime在此作用)
根据当前的配置设置,对任务执行阻塞或定时等待,如果由于以下原因必须退出此工作进程,则返回null(会导致runWorker退出while,当前线程后续会自动销毁):
- 有超过maximumPoolSize的工作线程数(由于调用setMaximumPoolSize)。
- 线程池停止了。
- 线程池已关闭,队列为空。
- 该worker在等待任务时超时,超时的worker在等待之前和之后都会被终止(即allowCoreThreadTimeOut || workerCount > corePoolSize),如果队列非空,则此工作线程不是池中的最后一个线程。
private Runnable getTask() {
boolean timedOut = false; // 上次poll的时候超时了吗?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 只在需要的时候检查队列是否为空
// 只有在线程池为SHUTDOWN并且队列为空的时候减少工作线程数,或者在STOP之后的状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断是都会进行poll超时。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当wc大于最大允许线程数或者允许超时就结束线程并且已经超时了就对wc进行递减。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//递减利用外部for循环进行自旋,成功了就返回null
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//从队列中取出任务需要设置超时就设置,不需要就阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
可以看到:
-
如果核心线程没有设置超时,那么当wc大于核心线程数当队列中没有任务时就超时自动返回null,会导致执行线程退出。
这里超时的线程无所谓是后续建的非核心线程还是前面创建的核心线程,工作线程本身不区分是否是核心,核心的目的只在于在没有设置超时的时候始终维持在池中。
-
如果核心线程设置了超时,那么只要线程获取任务超时就会导致线程退出。
processWorkerExit-处理工作线程退出
为垂死的工作线程进行清理和统计。仅从工作线程中调用。除非设置了“completedAbruptly”,否则假定workerCount已被调整以考虑退出。此方法从工作线程集中删除线程,然后则可能会终止线程池或者如果由于用户任务异常而退出,或者如果正在运行的工作线程少于corePoolSize,或者队列非空但没有工作线程,则可能会替换工作线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果突然,那么workerCount没有调整
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//加上当前线程的任务完成数——可以看到不是每完成一个就进行统计,只有没有任务了或者线程处理异常了才进行。
completedTaskCount += w.completedTasks;
//移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//如果(线程池处于SHUTDOWM并且队列为空)或者(线程池处于STOP状态并且队列为空),则变成TERMINATED
tryTerminate();
int c = ctl.get();
//如果当前线程处于STOP之前的状态
if (runStateLessThan(c, STOP)) {
//不是异常退出
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 线程足够了不需要添加
}
//增加非核心线程
addWorker(null, false);
}
}
复制代码
这里可以看到工作线程小于核心线程的时候和异常退出的时候会增加非核心线程数,这是为啥呢?因为走到这步已经是没有任务了所以增加非核心是比较明智的。如果后面又有任务了也没关系,execute在执行任务的会判断核心数是否达到,没达到就增加核心线程。
任务缓冲-阻塞队列的选择
- ArrayBlockingQueue:一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。
- LinkedBlockingQueue:一个由链表组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建的队列有容量危险。
- PriorityBlockingQueue:一个支持线程优先排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定排序规则,不能保证同优先级元素的顺序。
- DelayQueue:一个实现DelayQueue一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
- SynchronousQueue:一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SyncronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPoll()就使用了SynchronousQUeue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。(LinkedTransferQueue采用的一种预占模式。意思就是消费者线程取元素时,如果队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程park住,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,唤醒该节点上park住线程,被唤醒的消费者线程拿货走人。这就是预占的意思:有就拿货走人,没有就占个位置等着,等到或超时。)
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列(有界)。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
最常用的就是LinkedBlockingQueue这里不需要随机访问用链表结构不需要连续的空间;优先队列PiorityBlockingQueue在一般业务中用的也很少,一般的ToC应用都希望任务能早点完成;SynchronousQueue队列可以用来控制线程的新建,有一个任务来了就新建一个线程,在Executors.newCachedThreadPoll()就使用到了;LinkedTransferQueue当没有任务的时候比其他的阻塞队列相比通常会更快执行新来的任务;LinkedBlockingDeque需要获取任务的代码配合使用;