说明:本系列的线程池基于jdk8。
在开始本系列之前,假设我们对线程及线程池的相关概念已经很熟悉了。
阻塞队列常用方法
对于插入,抛异常或阻塞是在队列满的情况下,移除抛异常或阻塞则是队列空的情况下
Integer.toBinaryString(num)
:将num用二进制表示,返回字符串
定义
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
- 如果小于corePoolSize,那么创建线程
- 大于corePoolSize时,放入阻塞队列
- 阻塞队列满了时,放入maximumPoolSize
- maximunPoolSize满了时,根据handler的定义决定
- 抛异常
- 不执行返回
- 让调用线程执行
- 丢弃队列最前面的任务,然后重新尝试执行任务
- 只要corePoolSize能执行任务,就会移除阻塞队列中的任务并执行
- 当处于corePoolSize到maximumPoolSize之间创建的线程超过keepAliveTime+unit还没获取任务时,会销毁线程,等于0则立即销毁
内部结构
// 用一个原子类表示线程池状态与线程池数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最大数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态,用ctl的高三位表示
// 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务
private static final int STOP = 1 << COUNT_BITS;
// 010,所有的任务都已经终止
private static final int TIDYING = 2 << COUNT_BITS;
// 011,terminated()方法已经执行完成
private static final int TERMINATED = 3 << COUNT_BITS;
// 线程池中的所有工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();
复制代码
runStateOf(int c)
:当前线程池的状态,ctl的高三位表示
workerCountOf(int c)
:当前线程池的线程数,ctl的低二十九位表示
实现
定义1
ThreadPoolExecutor->execute():
int c = ctl.get();
// 线程池数量小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
ThreadPoolExecutor->addWorker():
// 当前线程池线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // --5
return false;
// 线程池线程数量+1
if (compareAndIncrementWorkerCount(c))
break retry;
......
w = new Worker(firstTask); // --1
final Thread t = w.thread;
......
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
......
if (workerAdded) {
// 执行worker线程,主线程直接返回
t.start(); // --2
workerStarted = true;
}
ThreadPoolExecutor->Worker->Worker(): // --1
// 这里的firstTask就是要执行的线程
this.firstTask = firstTask;
// 把自身封装成一个线程
this.thread = getThreadFactory().newThread(this);
复制代码
Worker
类也是一个线程,成员变量firstTask
是要执行的线程,thread
是自身。
ThreadPoolExecutor->runWorker(): // --2
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
try {
// 第一次循环,task不为null
// 第二次循环,线程任务执行完毕,走getTask()方法
while (task != null || (task = getTask()) != null) {
......
try {
beforeExecute(wt, task);
// 在这里执行线程方法
task.run();
afterExecute(task, thrown);
} finally {
task = null;
w.completedTasks++;
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
复制代码
ThreadPoolExecutor->getTask():
boolean timedOut = false;
for (;;) {
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 线程池数量-1
if (compareAndDecrementWorkerCount(c)) // --6
return null;
continue;
}
// timed为false执行take()方法,阻塞队列workQueue没有元素,无限阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // --4
workQueue.take(); // --3
if (r != null)
return r;
timedOut = true;
}
复制代码
定义2与定义5
在–5处,wc大于等于corePoolSize,返回false。进入下一段代码。
ThreadPoolExecutor->execute():
// 线程放入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
复制代码
在–3处,workQueue.take()
拿到阻塞队列中的线程并返回执行,执行完毕后,第三次循环,继续在–3处无限阻塞。
定义3与定义6
ThreadPoolExecutor->execute():
// 在放入阻塞队列失败的情况下会走到这边
else if (!addWorker(command, false))
reject(command);
复制代码
仍然是执行addWorker()
,在–5处,这一次判断的是wc大于等于maximumPoolSize,没有触发,于是实例化Worker
类并执行任务,执行完之后来到getTask()
。
第一次循环,timedOut为false,timed为true,在–4处,阻塞keepAliveTime+unit后,没有任务,timedOut为true,第二次循环,在–6处,当前线程池数量-1,返回null。
回到ThreadPoolExecutor->runWorker()
,执行processWorkerExit(w, completedAbruptly)
方法,workers移除当前工作线程,线程退出。
定义4
在–5处,wc大于等于maximumPoolSize,返回false,执行reject(command)
方法。
ThreadPoolExecutor->execute():
else if (!addWorker(command, false))
reject(command);
ThreadPoolExecutor->reject():
handler.rejectedExecution(command, this);
复制代码
这里的handler即拒绝策略,是ThreadPoolExecutor
类下实现RejectedExecutionHandler
接口的子类,当然也可以自己实现并指定。