前言
CompletionService的功能
- 一种将新异步任务的产生与已完成任务的结果的消耗分离的服务。 生产者submit任务以供执行。 消费者take已完成的任务并按照他们完成的顺序处理他们的结果。
- 简而言之,你随便什么顺序提交任务,我只按任务完成的顺序获取任务的结果。
阅读JDK源码
CompletionService接口
CompletionService接口实现类ExecutorCompletionService
- 成员变量
- 构造方法
注:
- 调用take()和poll(),会删除结果队列中的元素并获取它。
- take()和poll()前者会等待,后者不会等待。
- take()和poll()获取的结果是已经完成的Future,要想获取具体的任务结果需要调Future的get()。
- 至于什么的Future,且看之前的文章。
代码样例
public class CompletionServiceTest {
static AtomicInteger atomicInteger = new AtomicInteger(1);//任务标记
static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
static String getTime() {//获得时间
return LocalDateTime.now(ZoneOffset.of("+8")).format(formatter);
//字符串转时间String dateTimeStr = " 14:11:15:123";
}
public static void main(String[] args) {
Callable<String> callable = () -> {//callable 返回String
String start = getTime();//任务开始时间
int i = atomicInteger.getAndIncrement();//任务标记,第一个执行的就是task-1,第二个就是task-2..
Thread.sleep((long) (Math.random() * 1000));//任务执行时间
String end = getTime();//任务完成时间
return i < 10 ? "task-" + i + " start time: " + start + " end time: " + end//调整一下格式
: "task-" + i + " start time: " + start + " end time: " + end;
};
//创建线程池,以用来构造completionService
ExecutorService executorService = new ThreadPoolExecutor(6, 12, 10,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(6), new ThreadPoolExecutor.AbortPolicy());
//构造CompletionService
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
//要用completionService提交16个任务
IntStream.range(0, 16).forEach(i -> completionService.submit(callable));
IntStream.range(0, 16).forEach(i -> {//循环
try {
//take(),拿到是Future,还要get()才能获取任务的具体结果
System.out.println(completionService.take().get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executorService.shutdown();//关闭线程池
}
}
复制代码
- 结果
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END