CompletableFuture 异步编程

创建CompletableFuture对象

创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两 个。runAsync(Runnable runnable)supplyAsync(Supplier<U> supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。

前两个方法和后两个方法的区别在于,后两个方法可以指定线程池参数。

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创 建的线程数是 CPU 的核数。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进 而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

1
2
3
4
5
6
7
8
9
10
// 使用默认线程池
static CompletableFuture<Void>
runAsync(Runnable runnable)
static <U> CompletableFuture<U>
supplyAsync(Supplier<U> supplier)
// 可以指定线程池
static CompletableFuture<Void>
runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U>
supplyAsync(Supplier<U> supplier, Executor executor)

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,需要关注两个问题:一个是异步操作什么时候 结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外, CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富 了,在 1.8 版本里有 40 个方法,这些方法该如何理解?

如何理解CompletionStage接口

可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、 并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶 和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系, 而烧开水、拿茶叶和泡茶就是汇聚关系。

CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的f3 = f1.thenCombine(f2, ()->{})描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系 是一种 AND 聚合关系,这里的 AND 指的是所有依赖的任务(烧开水和拿茶叶)都完成后 才开始执行当前任务(泡茶)。既然有 AND 聚合关系,那就一定还有 OR 聚合关系,所谓 OR 指的是依赖的任务只要有一个完成就可以执行当前任务。

下面来一个一个介绍completionStage接口如何描述串行关系,AND聚合关系,OR聚合关系以及异常处理。

  • 描述串行关系

CompletionStage 接口里描述串行关系,主要是thenApply,thenAccept,thenRun和thenCompose这四个系列的接口。

thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回 值,所以 thenApply 系列方法返回的是CompletionStage<R>

而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer<T>,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不 支持回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>

thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持 返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>。 这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的 是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。

1
2
3
4
5
6
7
8
CompletionStage<R> thenApply(fn); 
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

下面是thenApply()方法的使用。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不 过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖 ②的执行结果。

1
2
3
4
5
6
7
8
9
CompletableFuture<String> f0 = 
CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③

System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ
  • 描述AND汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、 thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、 consumer、action 这三个核心参数不同。

1
2
3
4
5
6
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
  • 描述OR汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核 心参数不同。

1
2
3
4
5
6
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CompletableFuture<String> f1 = 
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});

CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});

CompletableFuture<String> f3 =
f1.applyToEither(f2,s -> s);

System.out.println(f3.join());