ThreadPool线程池
参考:
0、引言
用jdbc操作过数据库应该知道,操作数据库需要和数据库建立连接,拿到连接之后才能操作数据库,用完之后销毁。数据库连接的创建和销毁其实是比较耗时的,真正和业务相关的操作耗时是比较短的。每个数据库操作之前都需要创建连接,为了提升系统性能,后来出现了数据库连接池,系统启动的时候,先创建很多连接放在池子里面,使用的时候,直接从连接池中获取一个,使用完毕之后返回到池子里面,继续给其他需要者使用,这其中就省去创建连接的时间,从而提升了系统整体的性能。
线程池和数据库连接池的原理也差不多,创建线程去处理业务,可能创建线程的时间比处理业务的时间还长一些,如果系统能够提前为我们创建好线程,我们需要的时候直接拿来使用,用完之后不是直接将其关闭,而是将其返回到线程池中,给其他需要这使用,这样直接节省了创建和销毁的时间,提升了系统的性能。
简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。
1、线程池概述
线程池是一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。简单来说,线程都放在一个池里,每次使用都从池里获取,用完后归还到池中。
线程池做的工作是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超过数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
一般线程池主要分为以下4个组成部分:
- 线程池管理器:用于创建并管理线程池;
- 工作线程:线程池中的线程;
- 任务接口:每个任务必须实现的接口,用于工作线程调度其运行;
- 任务队列:用于存放待处理的任务,提供一种缓冲机制。
它的主要特点为
- 提⾼线程的利⽤率(线程复用)
- 提⾼响应速度
- 便于统⼀管理线程对象
- 可控制最⼤并发数
看cpu的核数
System.out.println(Runtime.getRuntime().availableProcessors());
复制代码
2、线程池架构
Java中的线程池是通过Executor
框架实现的,该框架中用到了Executor
,Executors
,ExecutorService
,ThreadPoolExecutor
这几个类
3、线程池使用方式
3.1、一池N线程
主要实现方法:
Executors.newFixedThreadPool(int nThreads)
复制代码
源码
public static ExecutorService newFixedThreadPool (int nThreads) {
return new ThreadPoolExecutor (nThreads,nThreads,
keepAliveTime: 0L,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()) ;
}
复制代码
主要特点如下:
- 创建一个定长线程池,并且可控制线程的最大并发数,超出的线程会在队列中等待
newFixedThreadPool方法
创建的线程池corePoolsize
和MaxmumPoolsize
是相等的,它底层使用的是LinkedBlockingQueue
阻塞队列- 线程池中的线程处于一定的量,可以很好的控制线程的并发量
- 线程可以重复被使用,在显示关闭之前,都将一直存在
- 超出一定量的线程被提交时候需在队列中等待
- tip:定长线程池的大小最好根据系统资源进行设置,如Runtime.getRuntime().availableProcessors();
3.2、一池一线程
主要实现方法:
Executors.newSingleThreadExecutor()
复制代码
源码
@NotNull public static ExecutorService newSingleThreadExecutor (){
return new Finali zableDelegatedExecutorService
(new ThreadPoolExecutor(corePoolSize: 1,maximumPoolSize: 1,
keepAliveTime: OL,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())) ;
}
复制代码
主要特点如下:
- 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,之后提交的线程活动将会排在队列中以此 执,保证所有任务都按照指定顺序执行
newSingleThreadExecutor
将corePoolsize
和MaxmumPoolsize
都设置为1
,它使用的是LinkedBlockingQueue
阻塞队列
3.3、可扩容线程池(线程池根据需求创建线程)
主要实现方法:
Executors.newCachedThreadPool()
复制代码
源码
@NotNull public static ExecutorService newCachedThreadPool () {
return new ThreadPoolExecutor (corePoolSize: 0,Integer.MAX_VALUE,
keepAliveTime: 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()) ;
}
复制代码
主要特点如下:
- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程。若无可回收,则创建新线程
newCachedThreadPool
将corePoolsize
设置为0
,MaxmumPoolsize
设置为Integer.MAX_VALUE
,它使用的是synchronousQueue
- 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
- 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
- 当线程池中,没有可用线程,会重新创建一个线程
- 也就是说来了任务就创建线程运行,如果线程空闲超过60秒,就销毁线程
- 遇强则强
3.4、代码演示
public class ExecutorDemo {
public static void main(String[] args) {
//一池N线程
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
//一池一线程
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
//一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();
//模拟20个用户来办理业务,每个用户就是一个来自外部的请求线程
try{
for(int i=1;i<=20;i++){
threadPool3.execute(()->{ //线程池执行
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool3.shutdown();//将线程放回线程池,关闭
}
}
}
复制代码
4、线程池的七大参数
上述三种线程池的创建方式中,从源码可以看到内部均使用如下代码创建线程池 ,即jdk
中提供了线程池的具体实现,实现类是:java.util.concurrent.ThreadPoolExecutor
,主要构造方法:
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //非核心线程空闲时的存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //存放提交但未执行任务的队列
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler) // 等待队列满后的拒绝策略
复制代码
每个参数的具体含义如下:
corePoolSize
:线程池中的常驻核心线程数- 在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程【今日的当值窗口[2个]】
- 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中.
maximumPoolSize
:线程池能够容纳同时执行的最大线程数,此值大于等于1—5个[2[黑色正方形]+3(灰色正方形)]keepAliveTime
:多余的空闲线程存活时间,当空间时间达到keepAliveTime
值时,多余的线程会被销毁直到只剩下corePoolSize个线程为止(非核心线程)。- 即如果没有任务处理了,有些线程会空闲,空闲的时间超过了这个值,会被回收掉。如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率。
- 假设这样一种场景,候客区三个已经满了,这个时候又来了3个,那么其他三个窗口就会打开办理业务,这个时候原来在候客区的用户办理业务,新来的三个在候客区等待。随着时间的推移,可能没人来了,候客区的用户也都办理完了,这个时候非当值窗口空闲,这个参数就是空闲多久后关闭非当值窗口
unit
:keepAliveTime的单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit
,这个枚举也经常使用,有兴趣的可以看一下其源码。workQueue
:任务队列,阻塞队列,被提交但尚未被执行的任务(候客区),用于缓存待处理任务的阻塞队列,常见的有4种,本文后面有介绍。threadFactory
:表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可handler
:拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示 数(maxnumPoolSize)时如何来拒绝AbortPolicy
(默认):直接抛出RejectedExecutionException
异常阻止系统正常运行。CallerRunsPolicy
:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。DiscardOldestPolicy
:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务DiscardPolicy
:该策略直接默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
5、线程池4种拒绝策略
new ThreadPoolExecutor.AbortPolicy()
:银行满了,还有人进来,不处理这个,抛出异常new ThreadPoolExecutor.CallerRunsPolicy()
:哪来的去哪里!new ThreadPoolExecutor.DiscardPolicy()
:队列满了,丢掉任务,不会抛出异常new ThreadPoolExecutor.DiscardOldestPolicy()
:队列满了,尝试去和最早的竞争,也不会抛出异常
6、线程池的底层工作原理
总结:调用线程池的execute方法处理任务,执行execute方法的过程:
- 在创建了线程池后,线程池中的线程数为零
- 当调用 execute() 方法添加一个请求任务时,线程池做出如下判断:
- 如果正在运行的线程数量小于 corePoolSize ,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize ,那么将这个任务放入阻塞队列;
- 如果队列满了且正在运行的线程数量还小于
maximumPoolSize
,那么还是要创建非核心线程且立即优先运行这个任务;比如上图中没有蓝色包围的三个线程 - 如果队列满了且正在运行的线程数量大于或等于
maximumPoolSize
,那么线程池会启动饱和拒绝策略来执行。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 当一个线程无事可做超过一定时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
当向线程池提交一个任务之后,线程池的处理流程如下:
- 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程
- 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程
- 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。
7、自定义线程池
不允许使用上面所有的 Executors
的方式创建线程池,因为阿里java开发手册规定:
如下代码:
线程池核心线程数量为2,最大为5,阻塞队列为3,多运行几次,可以看到拒绝策略AbortPolicy
的效果。
线程池的使用步骤:
- 调用构造方法创建线程池
- 调用线程池的excute方法处理任务
- 关闭线程池
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService threadService = new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),//阻塞队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());//拒绝策略
try{
for(int i=1;i<=10;i++){
threadService.execute(()->{
System.out.println(Thread.currentThread().getName()+"办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadService.shutdown();
}
}
}
复制代码
结果:由于拒绝策略的原因,抛出异常
pool-1-thread-1办理业务
pool-1-thread-3办理业务
pool-1-thread-2办理业务
pool-1-thread-5办理业务
pool-1-thread-3办理业务
pool-1-thread-1办理业务
pool-1-thread-4办理业务
pool-1-thread-2办理业务
java.util.concurrent.RejectedExecutionException: Task IO_File.ReentrantReadWriteLockDemo$$Lambda$1/2093631819@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at IO_File.ReentrantReadWriteLockDemo.main(ReentrantReadWriteLockDemo.java:23)
复制代码
演示2:给线程池中线程起一个有意义的名字
给线程池中线程起一个有意义的名字,在系统出现问题的时候,通过线程堆栈信息可以更容易发现系统中问题所在。
自定义创建工厂需要实现java.util.concurrent.ThreadFactory
接口中的Thread newThread(Runnable r)
方法,参数为传入的任务,需要返回一个工作线程。
示例代码:
public class CustomizeThreadFactory {
static AtomicInteger threadNum = new AtomicInteger(1);
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("自定义线程-" + threadNum.getAndIncrement());
return thread;
}
});
for (int i = 0; i < 5; i++) {
String taskName = "任务-" + i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "处理" + taskName);
});
}
executor.shutdown();
}
}
复制代码
运行上面的代码,输出结果:
自定义线程-2处理任务-1
自定义线程-5处理任务-4
自定义线程-4处理任务-3
自定义线程-1处理任务-0
自定义线程-3处理任务-2
复制代码
代码中在任务中输出了当前线程的名称,可以看到是我们自定义的名称。
通过jstack
查看线程的堆栈信息,也可以看到我们自定义的名称,我们可以将代码中executor.shutdown();
先给注释掉让程序先不退出,然后通过jstack
查看,如下:
"自定义线程-5" #15 prio=5 os_prio=0 tid=0x00000000194fb000 nid=0x4ff0 waiting on condition [0x000000001a26f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"自定义线程-4" #14 prio=5 os_prio=0 tid=0x00000000194fa800 nid=0x3450 waiting on condition [0x000000001a16f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"自定义线程-3" #13 prio=5 os_prio=0 tid=0x00000000194f5800 nid=0x4808 waiting on condition [0x000000001a06f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"自定义线程-2" #12 prio=5 os_prio=0 tid=0x00000000194f5000 nid=0x1c10 waiting on condition [0x0000000019f6e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"自定义线程-1" #11 prio=5 os_prio=0 tid=0x00000000194f4000 nid=0x20bc waiting on condition [0x0000000019e6e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
复制代码
8、线程池中常见5种工作队列
任务太多的时候,工作队列用于暂时缓存待处理的任务,jdk中常见的5种阻塞队列:
ArrayBlockingQueue
:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序。LinkedBlockingQueue
:是一个基于链表结构的阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue
。静态工厂方法Executors.newFixedThreadPool
使用了这个队列。SynchronousQueue
:一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool
使用这个队列。PriorityBlockingQueue
:优先级队列,进入队列的元素按照优先级会进行排序。
9、面试题
线程池用过吗?生产上你是如何设置合理参数
线程池的拒绝策略请你谈谈
- 等待队列也已经排满了,再也塞不下新的任务了。同时,线程池的maximumPoolSize也到达了极限,无法继续为新任务服务,这时我们需要拒绝策略机制合理的处理这个问题
- JDK内置的拒绝策略
- AbortPolicy(默认):直接抛出RejectedException异常阻止系统正常运行
- CallerRunsPolicy:调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是返回给调用者进行处理
- DiscardOldestPolicy:将最早进入队列的任务删除,之后再尝试加入队列
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常.如果允许任务丢失,这是最好的拒绝策略
- 以上内置策略均实现了
RejectExecutionHandler
接口
你在工作中单一的/固定数的/可变你的三种创建线程池的方法,你用哪个多?超级大坑
- ①答案是一个都不用,我们生产上只能使用自定义的
- ②Executors中JDK给你提供了为什么不用?
- 参考阿里巴巴java开发手册
你在工作中是如何创建线程池的,是否自定义过线程池使用
①AbortPolicy:最大不会抛出异常的值= maximumPoolSize + new LinkedBlockingDeque<Runnable>(3)
=8个。如果超过8个,默认的拒绝策略会抛出异常
②CallerRunPolicy: 如果超过8个,不会抛出异常,会返回给调用者去
③DiscardOldestPolicy:如果超过8个,将最早进入队列的任务删除,之后再尝试加入队列
④DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的拒绝策略
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5, //Runtime.getRuntime().availableProcessors(),
1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(3),
Executors.defaultThreadFactory(),
//默认抛出异常
//new ThreadPoolExecutor.AbortPolicy()
//回退调用者
//new ThreadPoolExecutor.CallerRunsPolicy()
//处理不来的不处理,丢弃时间最长的
//new ThreadPoolExecutor.DiscardOldestPolicy()
//直接丢弃任务,不予任何处理也不抛出异常
new ThreadPoolExecutor.DiscardPolicy()
);
//模拟10个用户来办理业务 没有用户就是来自外部的请求线程.
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
//threadPoolInit();
}
}
复制代码
结果
pool-1-thread-2 办理业务
pool-1-thread-2 办理业务
pool-1-thread-2 办理业务
pool-1-thread-2 办理业务
pool-1-thread-2 办理业务
pool-1-thread-2 办理业务
pool-1-thread-2 办理业务
pool-1-thread-1 办理业务
pool-1-thread-4 办理业务
pool-1-thread-3 办理业务
复制代码
合理配置线程池你是如何考虑的?
①. CPU密集型
- 并发:CPU一核,模拟出来多条线程(多线程操作同一资源)
- 并行:CPU多核,多个线程可以同时执行:线程池(多个人一起走)
- 并发编程的本质:充分利用CPU资源
②. IO密集型
线程池的最大的大小如何设置 (CPU密集型和IO密集型—调优)
public class Demo6 {
public static void main(String[] args) {
//自定义线程池(工作必备)
/*
* 最大线程该如何定义
* 1.CPU密集型:几核就是几,可以保持CPU的效率最高
* 2.IO密集型:>判断程序中十分耗IO的线程
* 程序15大型任务 io十分耗资源
*
*/
//获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService threadPool=new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
//队列满了,尝试去和最早的竞争,也不会抛出异常
);
try {
//最大承载=Deque+max=8
//java.util.concurrent.RejectedExecutionException
for (int i = 1; i <=9; i++) {
//使用线程池之后,利用线程池创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "ok");
});
}
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
复制代码
10、使用ThreadFactoryBuilder类来创建线程工厂
JDK 自带工具类创建的线程池存在的问题
- 有的线程池可以无限添加任务或线程,容易导致 OOM;比如FixedThreadPool和 CachedThreadPool。
- 使用 JDK 自带的线程工厂 (ThreadFactory)创建的线程名称都是类似
pool-1-thread-1
的形式,第一个数字是线程池编号,第二个数字是线程编号,这样很不利于系统异常时排查问题。
使用可自定义线程名称的线程工厂,借助大名鼎鼎的谷歌开源工具库 Guava,引入依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
复制代码
然后使用其提供的ThreadFactoryBuilder类来创建线程工厂,Demo如下:
public class ThreadPoolDemo {
// 线程数
public static final int THREAD_POOL_SIZE = 16;
public static void main(String[] args) throws InterruptedException {
// 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("hyn-demo-pool-%d").build();
// 创建线程池,其中任务队列需要结合实际情况设置合理的容量
ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE,
THREAD_POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
// 新建 1000 个任务,每个任务是打印当前线程名称
for (int i = 0; i < 1000; i++) {
executor.execute(() -> System.out.println(Thread.currentThread().getName()));
}
// 优雅关闭线程池
executor.shutdown();
executor.awaitTermination(1000L, TimeUnit.SECONDS);
// 任务执行完毕后打印"Done"
System.out.println("Done");
}
}
复制代码
输出结果:
.........
hyn-demo-pool-2
hyn-demo-pool-6
hyn-demo-pool-13
hyn-demo-pool-12
hyn-demo-pool-15
Done
复制代码