CompletableFuture 异步编程
优势
- 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
- 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务3要等待任务1和任务2都完成后才能开始”;
- 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
创建CompletableFuture对象
创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
复制代码
Runnable 接口的run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。后两个方法可以指定线程池参数。
默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
CompletionStage接口
CompletionStage 接口可以清晰地描述任务之间的这种时序关系
串行关系
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
复制代码
参数 fn 的类型是接口 Function<T, R>,这个接口里与CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。
而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回CompletionStage。
thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。
CompletableFuture<String> cp = CompletableFuture
.supplyAsync(()->"one")
.thenApply(s->s+"two")
.thenApply(String::toUpperCase);
System.out.println(cp.get());//ONETWO
复制代码
AND
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
复制代码
OR
以一个执行为准,另外一个不被执行
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
复制代码
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("**********");
return Thread.currentThread().getName()+"one";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----------");
return Thread.currentThread().getName()+"two";
});
CompletableFuture<String> cf3 = cf1.applyToEither(cf2, String::toUpperCase);
System.out.println(cf3.join());
复制代码
异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
复制代码
CompletableFuture<String> cf3 = cf1.applyToEither(cf2, String::toUpperCase)
.exceptionally(e->{// 相当于catch
e.printStackTrace();
return "";
}).whenComplete((s,e)->{// 相当于finally
System.out.println(s.length());
e.printStackTrace();
});
复制代码