介绍
多线程配合多核处理器确实可以提高生产系统的吞吐量和性能。但在实际生产环境中,盲目创建大量线程对系统性能是有伤害的,甚至会拖垮系统。那是因为线程的创建和销毁需要花费时间,线程本身也需要占用内存空间,线程数量太多会耗尽CPU资源和内存资源,并且大量线程回收会使GC时间变长,给GC带来压力。我们可以通过限制并发任务的数量来避免这些问题。
在合适的线程数量的基础上,为了避免系统频繁的创建和销毁线程所带来的花销,让创建的线程复用是一个很好的方法。类似于数据库连接池的思想。Java中的线程池也是类似的思想。
Java中的线程池是运用场景最多的并发框架,异步和并发执行的程序都可以使用线程池。合理使用线程池总结下来,可以带来下面的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源。如果无限制创建会消耗系统资源,降低系统的稳定性。使用线程池可以进行统一分配,调优和监控。
Executor框架
JUC包为JDK并发包的核心类库。JDK提供了一套Executor框架进行线程控制,本质上来说就是一个线程池,我们来看下Executor框架类和接口:
-
Executor是一个接口,它是Executor框架的基础,将任务的提交和执行解耦。
-
ExecutorService为Executor的扩展接口,增加了一些用于生命周期管理的方法以及任务提交的便利方法。
-
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
-
Executors扮演着线程池工厂的角色(通过Executors可以获取特定功能的线程池)。
-
ScheduledThreadPoolExecutor是一个实现类,可给定延迟后执行任务或定期执行任务
另外从上图方法中可以看到Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor和ScheduledThreadPoolExecutor执行。
Executor框架异步计算结果相关类,包括Future和实现Future接口的FutureTask类。主要类和接口如下图所示:
上图我们可以看到FutureTask实现了Runnable接口,我们是可以直接创建FutureTask提交给ExecutorService执行的。
通过上面相关类和接口的介绍,大概了解了它们的作用,下面我们把它们串起来,我们来看下Executor框架使用示意图:
1.主线程创建实现了Runnable或者Callable接口的任务对象
2.任务对象提交给ExecutorService执行(可以看到Runnable任务对象有两种方式,一种是execute(不返回异步处理结果),另外一种是submit(返回异步处理结果))
3.有异步处理返回结果的(执行submit的任务)会返回实现Future接口的对象
4.主线程通过返回实现Future接口的对象FutureTask.get()方法等待任务执行完成。也可以执行
FutureTask.cancel(boolean mayInterruptIfRunning)取消任务执行。
ThreadPoolExecutor
ThreadPoolExecutor是线程池的核心实现类,我们下面来详细了解下,并且会结合Executor框架工具类Executors创建特定类型的ThreadPoolExecutor来具体分析。
线程池的创建
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
上面是ThreadPoolExecutor最重要的构造函数,下面我们来分析下参数的具体含义,有助于我们更好的了解线程池的核心内部实现。
- corePoolSize:线程池的基本大小(核心线程数)。在ThreadPoolExecutor创建以后,默认并不会立即启动核心线程数量的线程,而是等到任务提交的时候才会启动。如果想要提前创建并启动所有核心线程,可以调用线程池的prestartAllCoreThreads方法实现,方法如下所示:
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
//循环添加核心大小数量的工作线程
while (addWorker(null, true))
++n;
return n;
}
复制代码
另外我们需要注意的是,在不调用prestartAllCoreThreads的情况下,线程池的线程数当提交新任务到线程池的时候,如果线程池内有空闲核心线程能够执行新任务也会创建一个新的线程来执行,等到需要执行的任务数大于线程池核心大小时则不会再创建。
-
maximumPoolSize:线程池中最大线程数量(可同时活动的线程数量上限)。在工作队列满的情况下才会创建超出核心大小的线程池。
-
keepAliveTime: 超过corePoolSize的空闲线程的存活时间。也就是说线程池线程数量大于核心大小并且某个线程的空闲时间超过keepAliveTime的话会被标记为可回收而销毁。
-
unit:keepAliveTime的单位
-
workQueue:任务队列,等待执行任务的阻塞队列
-
threadFactory: 线程工厂,用于创建线程。另外可以通过线程工厂给每个创建出来的线程设置有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里线程设置有意义的名字,例如下面代码:
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
复制代码
- handler: 拒绝策略,当线程池和队列都满了的话,处理提交的新任务的策略。
通过上面的介绍我们来看下线程池的主要处理流程图:
接下来我们对其中重要的参数workQueue,handler分别详细介绍下。
任务队列(workQueue)
保存等待执行的任务的阻塞队列。它是一个BlockingQueue接口的对象,用于存放Runnable对象。如下声明所示:
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
复制代码
线程池可选择以下几种队列:
- ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按FIFO(先进先出)顺序对元素排序。有界则需要生命最大容量,构造函数如下:
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
复制代码
- LinkedBlockingQueue: 无界队列,基于链表结构的阻塞队列,按照FIFO顺序排序元素。此队列可以指定队列容量,也可以不指定默认为Integer.MAX_VALUE(可以认为是无限大)。相对于ArrayBlockingQueue来说LinkedBlockingQueue吞吐量更大。但如果任务创建速度远大于线程处理速度,无界队列任务会越来越多,可能会造成内存耗尽情况。构造函数如下所示:
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
复制代码
- SynchronousQueue: 直接提交队列(不存储元素)。每个插入操作都需要等到线程调用移除操作。吞吐量通常高于LinkedBlockingQueue。由于提交任务不会保存,则会将新任务提交给线程执行,线程池无空闲线程则尝试创建新的线程执行。如果达到线程池最大线程数,则会采取拒绝策略。因此,使用该队列通常需要设置很大的maximumPoolSize最大线程数,不然很容易执行拒绝策略。构造函数如下:
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
//参数fair表示是否使用公平策略,为true的话则等待线程按 FIFO 顺序竞争访问,否则顺序是未指定的
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
复制代码
- PriorityBlockingQueue:具有优先级的无界阻塞队列。构造函数如下:
/**
* Creates a {@code PriorityBlockingQueue} with the default
* initial capacity (11) that orders its elements according to
* their {@linkplain Comparable natural ordering}.
*/
//默认初始化容量为11(private static final int DEFAULT_INITIAL_CAPACITY = 11)
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* Creates a {@code PriorityBlockingQueue} with the specified
* initial capacity that orders its elements according to their
* {@linkplain Comparable natural ordering}.
*
* @param initialCapacity the initial capacity for this priority queue
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
//可自定义初始化容量的构造函数
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
/**
* Creates a {@code PriorityBlockingQueue} with the specified initial
* capacity that orders its elements according to the specified
* comparator.
*
* @param initialCapacity the initial capacity for this priority queue
* @param comparator the comparator that will be used to order this
* priority queue. If {@code null}, the {@linkplain Comparable
* natural ordering} of the elements will be used.
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
//initialCapacity为初始化容量
//comparator为用于对优先级队列进行排序的比较器,如果为null则进行自然排序,可见优先级就体现在这个比较器上面
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
复制代码
拒绝策略(handler)
JDK内置拒绝策略如下:
- AbortPolicy策略:直接抛出异常
- CallerRunsPolicy策略:线程池未关闭,调用者线程运行被丢弃的任务。
- DiscardPolicy策略:不处理,丢弃任务
- DiscardOldestPolicy策略:丢弃最老的一个请求(也就是即将被执行任务),并尝试再次提交当前任务
可以根据自己的需要选择不同的拒绝策略,上面拒绝策略都实现了RejectedExecutionHandler接口:
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
复制代码
上面四个拒绝策略相关实现代码如下:
/* Predefined RejectedExecutionHandlers */
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//线程池没有被关闭则在调用者线程中执行
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//直接抛出RejectedExecutionException异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//不处理,直接丢弃掉
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//线程没有关闭则丢弃即将执行的在一个任务并尝试再次执行任务r
e.getQueue().poll();
e.execute(r);
}
}
}
复制代码
拒绝策略的简单示例:
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+"is discard");
}
});
for (int i = 0; i <= Integer.MAX_VALUE; i++){
es.submit(task);
Thread.sleep(10);
}
}
}
复制代码
执行部分结果如下:
1631374743886:Thread ID:11
1631374743898:Thread ID:12
1631374743910:Thread ID:13
1631374743922:Thread ID:14
1631374743929:Thread ID:15
java.util.concurrent.FutureTask@37bba400is discard
java.util.concurrent.FutureTask@179d3b25is discard
java.util.concurrent.FutureTask@254989ffis discard
java.util.concurrent.FutureTask@5d099f62is discard
1631374743987:Thread ID:11
1631374743998:Thread ID:12
1631374744010:Thread ID:13
1631374744023:Thread ID:14
1631374744031:Thread ID:15
java.util.concurrent.FutureTask@37f8bb67is discard
java.util.concurrent.FutureTask@49c2faaeis discard
java.util.concurrent.FutureTask@20ad9418is discard
复制代码
我们会发现很快就出现被丢弃的线程,这个和线程池设置的参数以及线程执行时间设置有关。
向线程池提交任务
上面Executor框架一节中我们介绍了execute()和submit()方法都可以向线程池提交任务。下面我们再来分别了解下。
- execute方法:用于提交没有返回值的任务,无法判断任务是否被线程池执行完成。具体代码实现如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl为线程池状态,记录workerCount(声明有效线程数)和runState(表明运行、关闭等状态)
int c = ctl.get();
//有效线程数workerCount小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//新增一个线程执行该任务(需要获取全局锁),执行成功则返回
/*
* addWorker中的第二个参数表示限制添加线程数量的判断
* 如果为true,根据corePoolSize来判断;
* 如果为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
//新增该任务线程失败则再次获取线程池状态
c = ctl.get();
}
//线程池处于运行状态并且线程池线程数大于等于corePoolSize则将任务加入阻塞队列workQueue
if (isRunning(c) && workQueue.offer(command)) {
//再次获取线程池状态
int recheck = ctl.get();
//如果线程池不在运行中状态则移除该任务,并且该任务会被拒绝
if (! isRunning(recheck) && remove(command))
//调用拒绝策略
reject(command);
//线程池在运行中状态,工作有效线程数worerCount为0则添加一个线程(保证线程池中至少有一个线程存在)
//addWorker第一个参数为null则表示创建一个线程但是不启动(没有把任务提交到该线程)。后期提交到队列中的任务会被线程获取执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//线程池不处于RUNNING状态或线程池处于RUNNING状态但队列已满(workerCount>=corePoolSize)则创建一个线程(线程数量上限为maximumPoolSize)执行任务。创建该任务线程失败,则任务将被拒绝
else if (!addWorker(command, false))
reject(command);
}
复制代码
上面代码流程总结如下:
1.当前运行线程workerCount < corePoolSize,创建新线程执行任务
2.运行线程数量workerCount>=corePoolSize并且阻塞队列未满,将任务加入阻塞队列BlockingQueue。
3.如果workerCount >= corePoolSize,workerCount<maximumPoolSize并且BlockingQueue队列已满,则创建新的线程处理任务
4.创建新任务使workerCount>maximumPoolSize,此时BlockingQueue队列已满,则创建失败拒绝任务。
其中最常用到的是第2步,第2步满足以后,我们会看到代码中再次进行检查了线程池状态.为什么需要再次检查那?是因为下面这两种情况。
- 线程池自上次检查进入该条件判断后关闭了。这时会将已经放在队列的任务并执行拒绝策略。这样操作可能是为了及时处理任务并及时反馈给调用者当前线程池情况。
- 线程池还在运行中但是线程池中有效工作线程数为0的情况。(例如corePoolSize为0,maximumPoolSize大于0,当线程池空闲存活时间正好都到期被回收了。)这时则会创建一个没有执行任务的线程,这个是为了保证线程池中至少得有1个线程存在,同时也是为了队列中的线程能被尽快执行。可能你会问为什么不直接在新创建线程中执行该任务?那是因为在这个操作的前提是任务已经被提交到阻塞队列,直接执行会导致该任务被执行两次,因为队列中的任务也会被线程执行。那你会说那就直接从队列竞争获取执行不就可以了?我认为这是不可行的,一方面去队列竞争获取任务不一定获取的是当前任务(当前任务可能被其它线程已经获取执行),另一方面如果参与队列获取任务竞争会增加任务提交的时间。可能是基于上面的考虑吧,工作线程与队列的交互应该由后续统一处理,而不是在提交任务的时候参与。
我们再来了解下上面代码中ctl线程状态以及它记录的workerCount和runState在源码中的声明:
//类型为原子变量,表示线程池状态,记录workerCount(声明有效线程数)和runState(表明运行、关闭等状态)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//32 - 3 等于29
private static final int COUNT_BITS = Integer.SIZE - 3;
//worerCount的最大值为(2^29)-1 (大约 5亿) 个线程
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//以下线程池运行状态,存储在ctl的高3位
//运行中状态(高三位为111) 接受新任务并处理排队任务
private static final int RUNNING = -1 << COUNT_BITS;
//关闭状态(高三位为000) 不接受新任务但处理排队任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//停止状态(高三位为001) 不接受新任务,不处理排队的任务,中断正在进行的任务
private static final int STOP = 1 << COUNT_BITS;
//整理状态(高三位为010) 所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 钩子方法
private static final int TIDYING = 2 << COUNT_BITS;
//已终止状态(高三位为100) terminated()已执行完成
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//下面为计算ctl,worerCount和runState的方法
//获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取有效线程数(该值可能与实际的活动线程数暂时不同)
private static int workerCountOf(int c) { return c & CAPACITY; }
//获取线程状态(记录录运行状态和有效线程数)
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
通过上面我们可以看到ctl的高3位表示运行状态,低29位表示有效线程数。我们也可以看到线程池的状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
我们来看下源码注释中对状态转换的描述:
/**
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*/
复制代码
对上面的状态转换我们通过流程图描述下:
execute方法中用到最多的addWorker方法我们来分析下:
//检查是否可以根据当前池状态和给定界限(核心或最大值)添加新的工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//最外层循环,主要校验线程池状态
for (;;) {
//线程池状态
int c = ctl.get();
//获取运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
```
/**
* 1.rs >= SHUTDOWN条件为true则表明最低要求是不能接受新的任务
* 2.! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
* --上面括号中的条件都满足则表明线程池线处于关闭状态(rs == SHUTDOWN),不接受新的任务(正好firstTask == null,没有新来的任务),阻塞队列BlockingQueue还有需要执行的任务,则线程池可以继续执行队列中的任务。2这个条件就不成立,会继续向下执行
* --如果2括号中的任意一个条件不满足则2条件便会成立,直接返回false。
* a).rs > SHUTDOWN则表明线程池既不接受任务也不处理线程池任务,则不需要新增线程,则2条件成立
* b).firstTask != null则表明有新任务需要添加到当前线程执行,与前提为rs == SHUTDOWN是不接受新的任务相悖,则2条件成立
*. c). workQueue.isEmpty() 阻塞队列为空,执行该队列前提是rs == SHUTDOWN && firstTask == null,线程池处于关闭状态,新任务为空,此时如果BlockingQueue为空则表明队列任务已经执行完成,无需再增加线程,则2条件满足
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内层循环
for (;;) {
//工作线程数量
int wc = workerCountOf(c);
/**
* 工作线程数量限制校验,满足下面两个条件之一返回false:
* 1.工作线程数量大于等于设置的最大限制CAPACITY((2^29)-1,大约5亿)
* 2.根据该方法第二个参数core,core为true则与corePoolSize进行判断(大于等于corePoolSize),core为false则与maximumPoolSize判断(大于等于maximumPoolSize)
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//尝试增加线程池线程数workerCount,成功则跳出最外层for循环
if (compareAndIncrementWorkerCount(c))
break retry;
//增加线程池线程数workerCount失败则重新获取状态
c = ctl.get(); // Re-read ctl
//线程池状态和最外层循环刚开始获取的状态不一致,则表明线程池状态发生了改变,则外层循环重新执行,例如可能线程池被终止了或者工作线程数达到最大值等情况导致的线程池状态发生改变
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//否则CAS由于workerCount 变化而失败,则重试内层循环
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建worker对象(worker工作线程)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/**
* 满足下面其中一个判断为true
* 1.线程池状态rs < SHUTDOWN表明是RUNNING状态
* 2.线程池处于关闭状态并且当前任务为null,则可以创建线程。(SHUTDOWN状态不可以提交新任务,但是可以处理队列中任务)
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//检查线程t是否可启动,不可启动抛异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers包含池中所有工作线程的集合。仅在持有 mainLock 时访问。workers添加新的工作线程
workers.add(w);
//添加新的线程后池中工作线程数量
int s = workers.size();
//largestPoolSize跟踪达到的最大池大小。只能在 mainLock 下访问。如果当前池大小大于largestPoolSize,变更largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//表明工作线程添加成功
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//工作线程添加成功则启动线程并将线程启动表示设置为true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//线程未启动成功的话则进行失败处理
if (! workerStarted)
//回滚工作线程的创建
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
从上面代码我们可以发现:
- addWorker主要是添加工作线程并执行该线程。
- 新增线程可能会直接执行当前任务或者创建的新线程暂时没有执行任务。
- 新增线程数量判断会根据参数core来判断是小于corePoolSize还是maximumPoolSize。
addWorker还有一些比较重要的方法在此不再深究,有兴趣的话可以自行研究,例如Worker对象(实现了Runnable接口),addWorkerFailed回滚工作线程的具体实现等方法。
- submit方法:用于提交需要返回值的任务。返回future类型的对象(可判断是否执行成功),通过future的get()方法获取返回值。future的get()会阻塞线程直到任务完成(另外还有get(long timeout, TimeUnit unit)方法可以设置超时限制)。submit代码如下所示:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//将Runnable对象包装成FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
//执行execute,也就是上面execute的方法,不再赘述
execute(ftask);
return ftask;
}
复制代码
我们会发现submit方法也是内部调用execute方法,但是不同的是参数是FutureTask,并且返回值也是该FutureTask对象。则可以判定线程池中线程执行该FutureTask任务的时候该对象会同时记录返回结果或异常。关于FutureTask我们会在后面详细介绍下。
线程池的关闭
可通过调用线程池的shutdown或者shutdownNow方法来关闭线程池。我们先来看下ExecutorService中的生命周期的管理接口:
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
...
复制代码
ThreadPoolExecutor实现了上面的接口,代码如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//安全策略校验
checkShutdownAccess();
//线程池状态设置为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//安全策略校验
checkShutdownAccess();
//将线程池状态设置为STOP
advanceRunState(STOP);
//中断所有正在执行或暂停任务的线程
interruptWorkers();
//队列中未被执行任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
return tasks;
}
public boolean isShutdown() {
//线程池状态是不为RUNNING则返回true
return ! isRunning(ctl.get());
}
public boolean isTerminating() {
int c = ctl.get();
//线程池状态c大于等于SHUTDOWN并且小于TERMINATED则表示正在终止中,返回true,否则false
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
//线程池状态>= TERMINATED 则表示线程池已经被终止
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//轮询判断线程池是否已经终止
for (;;) {
//线程池状态c >= TERMINATED则表示已经终止,返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
//等待时间到期状态还未变更为终止则返回false(还未终止)
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
复制代码
-
shutdown方法执行平缓关闭:先将线程池状态设置为SHUTDOWN,然后中断空闲线程。不再接受新任务,同时等待已经提交的任务执行完成(包括已提交但还未开始执行的任务)。
-
shutdownNow执行粗暴关闭:首先将线程池的状态设置为STOP,然后尝试停止所有正在执行或暂停任务的线程,并返回等待执行任务的列表。不再启动等待执行的任务。
线程池关闭则isShutdown返回true,关闭后提交的任务会执行拒绝策略。当所有任务都关闭表示线程池关闭成功,这时调用isTerminated返回true。如果是在终止进行中的话则isTerminating返回true。awaitTermination是具有超时限制的等待终止。在timeout时间内被终止则返回true,超时未终止返回false。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,起到了调度任务的功能,会在指定时间对任务进行调度。主要用在延迟任务和周期任务。
ScheduledThreadPoolExecutor的调度接口是在ScheduledExecutorService(继承了ExecutorService)中定义的,我们来看下定义的接口方法:
//创建并执行在给定延迟后启用的一次性操作。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//创建并执行在给定延迟后启用的 ScheduledFuture
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//创建并执行一个周期性动作,在给定的初始延迟(initialDelay)后首先启用,然后在给定的时间段内启用;即执行将在 initialDelay延迟 之后开始,然后是 initialDelay+period,然后是initialDelay + 2 period,依此类推。如果任务的任何执行遇到异常,则后续执行将被抑制。否则,任务只会通过取消或终止执行程序而终止。如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//创建并执行一个周期性动作,该动作首先在给定的初始延迟(initialDelay)后启用,随后在一个执行终止和下一个执行开始之间具有给定的延迟(delay)。如果任务的任何执行遇到异常,则后续执行将被抑制。否则,任务只会通过取消或终止执行程序而终止。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
复制代码
schedule方法会在给定时间对任务进行一次调度。scheduleAtFixedRate和scheduleWithFixedDelay方法会对任务进行周期性调度,但两者有点区别。如下图所示:
scheduleAtFixedRate任务调度频率一定,在上个任务开始执行时间+周期period时间调度下个任务。若某次执行时间大于period周期,下次执行会在这次执行后执行,不会叠加执行。
scheduleWithFixedDelay是在上个任务执行结束后,再经过delay时间进行任务调度。
ScheduledThreadPoolExecutor我们就分析到这,对其原理感兴趣的可以自行研究,例如ScheduledThreadPoolExecutor使用的阻塞队列DelayedWorkQueue,线程池中的线程从阻塞队列获取任务执行的交互,是怎么实现延迟执行的等都可以研究下。
Executors特定功能的线程池
Executors主要提供了以下工厂方法:
//返回固定线程数量的线程池。线程池无空闲线程则会暂存在LinkedBlockingQueue队列,直到有空闲线程处理队列中任务
public static ExecutorService newFixedThreadPool(int nThreads)
//返回只有一个线程的线程池。若大于一个任务提交到线程池,则会保存到LinkedBlockingQueue对咧,后续按照FIFO顺序执行队列任务
public static ExecutorService newSingleThreadExecutor()
//返回一个不限定线程数量的线程,优先使用可复用空闲线程
public static ExecutorService newCachedThreadPool()
//返回只有一个线程的线程池。ScheduledExecutorService接口扩展了给定时间执行某任务的功能,如固定延迟后执行或周期执行某个任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
//返回可指定线程数据的ScheduledExecutorService对象
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
复制代码
1.固定大小的线程池
我们以newFixedThreadPool来举例,来看以下简单的示例:
public class ThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": Thread Id:" + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
ExecutorService es = Executors.newFixedThreadPool(5);
//ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
es.submit(task);
}
}
}
复制代码
执行后输出结果如下:
1630981722160: Thread Id:Thread[pool-1-thread-1,5,main]
1630981722160: Thread Id:Thread[pool-1-thread-5,5,main]
1630981722160: Thread Id:Thread[pool-1-thread-4,5,main]
1630981722160: Thread Id:Thread[pool-1-thread-3,5,main]
1630981722160: Thread Id:Thread[pool-1-thread-2,5,main]
1630981727167: Thread Id:Thread[pool-1-thread-4,5,main]
1630981727167: Thread Id:Thread[pool-1-thread-5,5,main]
1630981727167: Thread Id:Thread[pool-1-thread-1,5,main]
1630981727167: Thread Id:Thread[pool-1-thread-2,5,main]
1630981727167: Thread Id:Thread[pool-1-thread-3,5,main]
复制代码
我们可以看到前五个线程和后五个线程执行相差5s,是按照每五个线程执行任务的(前五个线程和后五个线程ID是相同的)。如果将上面的newFixedThreadPool改为newCachedThreadPool执行的输出结果:
1630981903770: Thread Id:Thread[pool-1-thread-1,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-5,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-4,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-3,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-2,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-6,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-7,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-8,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-9,5,main]
1630981903770: Thread Id:Thread[pool-1-thread-10,5,main]
复制代码
我们会发现10个线程会一起执行完成(因为创建的是不限定线程数量的线程,每个任务提交到线程池都会创建一个线程)
2.计划任务
我们使用scheduleAtFixedRate来举例,示例如下:
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
//如果前面的任务没有完成,则调度也不会启动
ses.scheduleAtFixedRate(() -> {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 2, TimeUnit.SECONDS);
}
}
复制代码
执行可能输出如下:
1631442979
1631442981
1631442983
1631442985
1631442987
1631442989
1631442991
1631442993
...
复制代码
可以看到任务执行时间间隔为2s,下面我们来看下在如果任务执行时间大于设置的执行周期period回女出现什么情况。我们将上面代码Thread.sleep(1000)改为Thread.sleep(5000)来看下执行时间间隔。代码修改后执行输出如下:
1631444201
1631444206
1631444211
1631444216
1631444221
1631444226
1631444231
1631444236
复制代码
我们会看到时间间隔为每五秒执行一次,因此我们可以得出,当任务执行时间大于设置的周期时间,任务下次执行开始时间是在上次执行结束后执行。
合理线程池线程数量
对于计算密集型任务,在拥有N个处理器的系统上,当线程池的大小为N+1的时候能实现最优的利用率。对于包含IO操作或者其它阻塞操作的任务,线程并不会一直执行,线程池线程数量可以设置的更大一些。一般来说确定线程池大小需要考虑CPU数量,内存大小等因素。
为了保持处理器达到期望的使用率,最优的线程池大小为:
最优线程池大小 = CPU数量 * 目标CPU使用率 * (1 + 线程等待时间/线程执行时间)
线程池线程大小通常不会固定,而是由某种配置机制(例如分布式配置)来提供,或者根据下面代码动态计算取得可用的CPU数量。
Runtime.getRuntime().availableProcessors()
复制代码
异常消失
首先来看一个简单的例子:
public class DivTask implements Runnable
{
int a,b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a/b;
System.out.println(re);
}
public static void main(String[] args) {
ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>());
for (int i = 0; i< 5; i++){
pools.submit(new DivTask(100, i));
}
}
}
复制代码
输出结果如下:
100.0
25.0
33.0
50.0
复制代码
我们发现少了一个结果,除数i=0的那一个导致,但没有异常输出。如果实际项目中遇到这种问题那会很难查出问题而且很浪费时间。这肯定是不允许的。
通过下面2种方式我们可以处理上面问题:
1.我们来把上面代码中线程池submit方法改为execute方法
改造后输出结果如下:
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.wk.manage.web.controller.DivTask.run(DivTask.java:18)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
100.0
25.0
33.0
50.0
复制代码
2.通过下面Future.get()方式改造:
try {
Future re = pools.submit(new DivTask(100, i));
re.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
复制代码
改造后输出结果如下:
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.test.manage.web.controller.DivTask.main(DivTask.java:27)
Caused by: java.lang.ArithmeticException: / by zero
at com.test.manage.web.controller.DivTask.run(DivTask.java:16)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
100.0
50.0
33.0
25.0
复制代码
我们可以看到第1种方式我们可以知道异常是在哪里抛出的,第2种方式抛出的异常更全一些,既可以知道异常在哪里抛出的还可以知道任务提交的地方(通过Future.get()方法的定位)。因此建议使用第2种方式来处理,因为异常信息更全面。
如果非要使用第1种方式。那么我们需要扩展ThreadPoolExecutor线程池来实现了。例如下面扩展的线程池,在调度任务之前,保存提交任务线程堆栈信息:
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable task) {
super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private Exception clientTrace(){
return new Exception("Client stack trace");
}
private Runnable wrap(final Runnable task, final Exception clientStack,
String clientThreadName){
return () -> {
try {
task.run();
} catch (Exception e) {
//对异常进行捕获
//打印提交任务的堆栈
clientStack.printStackTrace();
throw e;
}
};
}
}
复制代码
然后我们将main方法改为:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>());
for (int i = 0; i< 5; i++){
pools.execute(new DivTask(100, i));
}
}
复制代码
执行输出结果如下:
java.lang.Exception: Client stack trace
at com.wk.manage.web.controller.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:24)
at com.wk.manage.web.controller.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:15)
at com.wk.manage.web.controller.DivTask.main(DivTask.java:25)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.wk.manage.web.controller.DivTask.run(DivTask.java:16)
at com.wk.manage.web.controller.TraceThreadPoolExecutor.lambda$wrap$0(TraceThreadPoolExecutor.java:31)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
100.0
25.0
33.0
50.0
复制代码
我们可以看到异常和任务提交的位置都打印出来了。
另外通过上面的示例,我们可以发现通过execute提交的任务,能够将它抛出的异常交给未捕获异常处理器,而通过submit提交的任务,无论抛出的是未检查异常还是已检查异常,都被认为是任务返回状态的一部分。如果一个由submit提交的任务由于抛出了异常而结束,这个异常将被Future.get封装在ExecutionException中重新抛出。
线程池的监控
对线程池监控是很有必要的,方便排查问题。下面我们来介绍下线程池提供的可以进行监控的属性。
-
taskCount:返回已安排执行的大致任务总数。由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值。
-
completedTaskCount:完成任务的数量(这些任务已经终止),小于等于taskCount
-
largestPoolSize:线程池曾经创建过的最大线程数量。通过该属性可知道线程池线程数是否达到过最大值
-
getPoolSize:返回线程池中当前线程数
-
getActiveCount:获取活动线程数(大致数量)
另外我们也可以通过继承ThreadPoolExecutor自定义线程池重写下面的方法进行监控:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
复制代码
总结
本文主要介绍了线程池使用原因,使用方式和相关使用原理,注意事项,有助于在日常生活中更好的使用线程池。
参考书籍:《Java高并发程序设计(第2版)》《Java并发编程实战》《Java并发编程的艺术》