这是我参与更文挑战的第3天,活动详情查看: 更文挑战
介绍
最近在做项目时了解了最好不要直接使用 new Thread(...).start()
,用线程池来隐式的维护所有线程,具体为什么可以看这篇文章。
其实 SpringBoot 已经为我们创建并配置好了这个东西,这里就来学习一下如何来使用 SpringBoot 为我们设置的线程池。
如有错误欢迎联系我指正!
使用
创建配置类
首先我们需要创建一个配置类来让 SpringBoot 加载,并且在里面设置一些自己需要的参数。
@Configuration
@EnableAsync
public class ExecutorConfig {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("开启SpringBoot的线程池!");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(corePoolSize);
// 设置最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 设置缓冲队列大小
executor.setQueueCapacity(queueCapacity);
// 设置线程的最大空闲时间
executor.setKeepAliveSeconds(keepAliveSeconds);
// 设置线程名字的前缀
executor.setThreadNamePrefix(namePrefix);
// 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
// CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程池初始化
executor.initialize();
return executor;
}
}
复制代码
首先,
@Configuration
的作用是表明这是一个配置类。@EnableAsync
的作用是启用 SpringBoot 的异步执行
其次,关于线程池的设置有
corePoolSize
: 核心线程数,当向线程池提交一个任务时池里的线程数小于核心线程数,那么它会创建一个线程来执行这个任务,一直直到池内的线程数等于核心线程数maxPoolSize
: 最大线程数,线程池中允许的最大线程数量。关于这两个数量的区别我会在下面解释queueCapacity
: 缓冲队列大小,用来保存阻塞的任务队列(注意这里的队列放的是任务而不是线程)keepAliveSeconds
: 允许线程存活时间(空闲状态下),单位为秒,默认60snamePrefix
: 线程名前缀RejectedExecutionHandler
: 拒绝策略,当线程池达到最大线程数时,如何处理新任务。线程池为我们提供的策略有- AbortPolicy:默认策略。直接抛出 RejectedExecutionException
- DiscardPolicy:直接丢弃掉被拒绝的任务,且不会抛出任何异常
- DiscardOldestPolicy:丢弃掉队列中的队头元素(也就是最早在队列里的任务),然后重新执行 提交该任务 的操作
- CallerRunsPolicy:由主线程自己来执行这个任务,该机制将减慢新任务的提交
关于 corePoolSize
与 maxPoolSize
的区别也是困惑了我很久,官方文档上的解释说的很清楚。我的理解如下:
这个线程池其实是有点“弹性的”。当向线程池提交任务时:
-
若
当前运行的线程数 < corePoolSize
则 即使其它的工作线程处于空闲状态,线程池也会创建一个新线程来执行任务
-
若
corePoolSize < 当前运行的线程数 < maxPoolSize
-
若 队列已满
则 创建新线程来执行任务
-
若 队列未满
则 加入队列中
-
-
若
当前运行的线程数 > maxPoolSize
-
若 队列已满
则 拒绝任务
-
若 队列未满
则 加入队列中
-
所以当想要创建固定大小的线程池时,将 corePoolSize
和 maxPoolSize
设置成一样就行了。
最后,别忘了给方法加上 @Bean
注解,否则 SpringBoot 不会加载。
这里因为我加了 @Value
注解,可以在 application.properties
中配置相关数据,如
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = test-async-
# 配置线程最大空闲时间
async.executor.thread.keep_alive_seconds = 30
复制代码
在具体的方法中使用
配置完上面那些使用起来就轻松了,只需在业务方法前加上 @Async
注解,它就会异步执行了。
如在 Service 中添加如下方法。
@Async("asyncServiceExecutor")
// 注:@Async所修饰的函数不能定义为static类型,这样异步调用不会生效
public void asyncTest() throws InterruptedException {
logger.info("任务开始!");
System.out.println("异步执行某耗时的事...");
System.out.println("如休眠5秒");
Thread.sleep(5000);
logger.info("任务结束!");
}
复制代码
然后在 Controller 里调用一下这个方法,在网页上连续发送请求做一个测试。
我这里连续发起了5次请求,可以看到这5个任务确实是成功地异步执行了。
我设置的线程池大小为 5,所以当超过 5 个任务被提交时,会放入阻塞队列中。
到这里,基本的异步执行任务就实现了。
自定义
虽然它提供给我们的线程池已经很强大了,但是有时候我们还需要一些额外信息,比如说我们想知道这个线程池已经执行了多少任务了、当前有多少线程在运行、阻塞队列里还有多少任务等等。那么这个时候我们就可以自定义我们的线程池。
自定义很简单,自己写一个类继承 Spring 提供的 ThreadPoolTaskExecutor
,在此之上做修改就好了。如
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class);
public void info() {
ThreadPoolExecutor executor = getThreadPoolExecutor();
if (executor == null) return;
String info = "线程池" + this.getThreadNamePrefix() +
"中,总任务数为 " + executor.getTaskCount() +
" ,已处理完的任务数为 " + executor.getCompletedTaskCount() +
" ,目前正在处理的任务数为 " + executor.getActiveCount() +
" ,缓冲队列中任务数为 " + executor.getQueue().size();
logger.info(info);
}
@Override
public void execute(Runnable task) {
info();
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
info();
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
info();
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
info();
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
info();
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
info();
return super.submitListenable(task);
}
}
复制代码
然后在我们的配置类 ExecutorConfig
中将
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
改为
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
,
也就是使用我们自己定义的线程池,然后会在相应的任务执行(execute()
)、任务提交(submit()
)时打印我们需要的信息了。
打印结果,在此之前已处理了5个任务:
查询线程池信息
上面自定义线程池后想查询信息只能在线程池中的方法查询,那如果我想在任意地方查询线程池的信息呢?那也是可以的,而且非常简单。我这里写一个接口来查询线程池的任务信息以做示例。
首先修改一下线程池里的 Info()
方法,让它返回我们需要的信息。
public String info() {
ThreadPoolExecutor executor = getThreadPoolExecutor();
if (executor == null) return "线程池不存在";
String info = "线程池" + this.getThreadNamePrefix() +
"中,总任务数为 " + executor.getTaskCount() +
" ,已处理完的任务数为 " + executor.getCompletedTaskCount() +
" ,目前正在处理的任务数为 " + executor.getActiveCount() +
" ,缓冲队列中任务数为 " + executor.getQueue().size();
logger.info(info);
return info;
}
复制代码
然后修改一下配置类 ExecutorConfig
里注册线程池的方法,让它注册的是我们自定义的线程池类型。
@Bean(name = "asyncServiceExecutor")
public VisibleThreadPoolTaskExecutor asyncServiceExecutor() {
logger.info("开启SpringBoot的线程池!");
// 修改这里,要返回我们自己定义的类 VisibleThreadPoolTaskExecutor
VisibleThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(corePoolSize);
// 设置最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 设置缓冲队列大小
executor.setQueueCapacity(queueCapacity);
// 设置线程的最大空闲时间
executor.setKeepAliveSeconds(keepAliveSeconds);
// 设置线程名字的前缀
executor.setThreadNamePrefix(namePrefix);
// 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程池初始化
executor.initialize();
return executor;
}
复制代码
再在我们需要信息的地方自动注入这个线程池,然后调用一下 info()
方法就能得到信息了,我这里以在 Service 层中获取信息为例。
@Service
public class DemoService {
private static final Logger logger = LoggerFactory.getLogger(DemoService.class);
// 别忘了这里要用 SpringBoot 的自动注入
@Autowired
private VisibleThreadPoolTaskExecutor executor;
// @SneakyThrows 这个注解是Lombok带的,我为了代码简洁使用的。你也可以使用 try catch 的方法。
@SneakyThrows
@Async("asyncServiceExecutor")
public void asyncTest() {
logger.info("任务开始!");
System.out.println("异步执行某耗时的事...");
System.out.println("如休眠5秒");
Thread.sleep(5000);
logger.info("任务结束!");
// 你甚至可以在任务结束时再打印一下线程池信息
executor.info();
}
public String getExecutorInfo() {
return executor.info();
}
}
复制代码
最后在 Controller 层中调用一下,就大功告成了!
@RestController
public class DemoController {
@Autowired
private DemoService demoService;
@GetMapping("/async")
public void async() {
demoService.asyncTest();
}
@GetMapping("/info")
public String info() {
return demoService.getExecutorInfo();
}
}
复制代码
来看一下测试的结果吧,我这里调用 /async 一口气开启了 15 个任务,然后在不同时间使用 /info 来看看信息。
刚开始时的结果:
一口气提交了15个任务后的中间结果:
所有任务都执行完了的最终结果:
总结
本篇到这里就结束了,篇幅略长。总结一下,要想在SpringBoot中使用它提供的线程池其实很简单,只要两步:
- 注册线程池(使用 @Bean 来注册),设置一些自己想要的参数;
- 在想要异步调用的方法上加上 @Async 注解。
当然你也可以不使用 @Async 注解,直接在想开线程的地方自动注入你注册的线程池,然后像普通线程池一样使用就行了。
其实关于这一方面的知识也讲得并不够详尽,比如线程池里还有哪些方法、SpringBoot是如何为我们弄得这么方便的等等,还需要多多补充知识。