从JDK 8开始,在Concurrent包中提供了一个强大的异步编程工具CompletableFuture
。在JDK8之前,异步编程可以通过线程池和Future
来实现,但功能还不够强大。
示例代码:
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture();
new Thread() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 另一个线程执行任务,将结果赋值给future
future.complete("hello rubin");
}
}.start();
System.out.println("任务已经提交");
// 阻塞的方法
String result = future.get();
System.out.println(result);
}
}
CompletableFuture
实现了Future
接口,所以它也具有Future
的特性:调用get()
方法会阻塞在那,直到结果返回。
另外1个线程调用complete
方法完成该Future
,则所有阻塞在get()
方法的线程都将获得返回结果。
runAsync与supplyAsync
上面的例子是一个空的任务,下面尝试提交一个真的任务,然后等待结果返回:
例1:runAsync(Runnable)
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Demo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 通过异步的方式给future指派任务,future没有返回值
CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行完毕");
}
});
Object o = future.get();
System.out.println(o);
}
}
CompletableFuture.runAsync(…)
传入的是一个Runnable
接口。
例2:supplyAsync(Supplier)
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class Demo3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 指定future要执行的任务,同时future会有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello rubin";
}
});
String result = future.get();
System.out.println(result);
}
}
例2和例1的区别在于,例2的任务有返回值。没有返回值的任务,提交的是Runnable
,返回的是CompletableFuture
;有返回值的任务,提交的是Supplier
,返回的是CompletableFuture
。Supplier
和前面的Callable
很相似。
通过上面两个例子可以看出,在基本的用法上,CompletableFuture
和Future
很相似,都可以提交两类任务:一类是无返回值的,另一类是有返回值的。
thenRun、thenAccept和thenApply
对于Future
,在提交任务之后,只能调用get()
等结果返回;但对于CompletableFuture
,可以在结果上面再加一个callback
,当得到结果之后,再接着执行callback
。
例1:thenRun(Runnable)
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Demo4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("hello rubin");
}
});
CompletableFuture<Void> voidCompletableFuture = future.thenRun(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行结束后的代码执行");
}
});
voidCompletableFuture.get();
System.out.println("任务执行结束");
}
}
该案例最后不能获取到结果,只会得到一个null。
例2:thenAccept(Consumer)
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class Demo5 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello rubin";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
// 可以获取上个任务的执行结果,接着进行处理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s.length());
}
});
Void aVoid = future.get();
System.out.println(aVoid);
}
}
上述代码在thenAccept
中可以获取任务的执行结果,接着进行处理。
例3:thenApply(Function)
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
public class Demo6 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello rubin";
}
}).thenApply(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
// 接收上个任务的返回值,接着处理,同时将处理结果返回给future
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
}
});
Integer integer = future.get();
System.out.println(integer);
}
}
三个例子都是在任务执行完成之后,接着执行回调,只是回调的形式不同:
thenRun
后面跟的是一个无参数、无返回值的方法,即Runnable
,所以最终的返回值是CompletableFuture<Void>
类型thenAccept
后面跟的是一个有参数、无返回值的方法,称为Consumer
,返回值也是CompletableFuture<Void>
类型。顾名思义,只进不出,所以称为Consumer
;前面的Supplier
,是无参数,有返回值,只出不进,和Consumer
刚好相反thenApply
后面跟的是一个有参数、有返回值的方法,称为Function
。返回值是CompletableFutur<String>
类型
而参数接收的是前一个任务,即supplyAsync(…)
这个任务的返回值。因此这里只能用supplyAsync
,不能用runAsync
。因为runAsync
没有返回值,不能为下一个链式方法传入参数。
thenCompose与thenCombine
例1:thenCompose
在上面的例子中,thenApply
接收的是一个Function
,但是这个Function
的返回值是一个通常的基本数据类型或一个对象,而不是另外一个CompletableFuture
。如果Function
的返回值也是一个CompletableFuture
,就会出现嵌套的CompletableFuture
。考虑下面的例子:
如果希望返回值是一个非嵌套的CompletableFuture
,可以使用thenCompose
:
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
public class Demo7 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello rubin";
}
}).thenCompose(new Function<String, CompletableFuture<Integer>>() {
@Override
public CompletableFuture<Integer> apply(String s) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return s.length();
}
});
}
});
System.out.println(future.get());
}
}
下面是thenCompose
方法的接口定义:
CompletableFuture
中的实现:
从该方法的定义可以看出,它传入的参数是一个Function
类型,并且Function
的返回值必须是CompletionStage
的子类,也就是CompletableFuture
类型。
例2:thenCombine
thenCombine
方法的接口定义如下,从传入的参数可以看出,它不同于thenCompose
。
第1个参数是一个CompletableFuture
类型,第2个参数是一个方法,并且是一个BiFunction
,也就是该方法有2个输入参数,1个返回值。
从该接口的定义可以大致推测,它是要在2个CompletableFuture
完成之后,把2个CompletableFuture
的返回值传进去,再额外做一些事情。实例如下:
package com.rubin.concurrent.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public class Demo8 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
}).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "rubin";
}
}), new BiFunction<String, String, Integer>() {
@Override
public Integer apply(String s, String s2) {
System.out.println("s = " + s);
System.out.println("s2 = " + s2);
return s.length() + s2.length();
}
});
System.out.println(future.get());
}
}
任意个CompletableFuture的组合
上面的thenCompose
和thenCombine
只能组合2个CompletableFuture
,而接下来的allOf
和anyOf
可以组合任意多个CompletableFuture
。方法接口定义如下所示:
首先,这两个方法都是静态方法,参数是变长的CompletableFuture
的集合。其次,allOf
和anyOf
的区别,前者是“与”,后者是“或”。
allOf
的返回值是CompletableFuture
类型,这是因为每个传入的CompletableFuture
的返回值都可能不同,所以组合的结果是无法用某种类型来表示的,索性返回Void
类型。
anyOf
的含义是只要有任意一个CompletableFuture
结束,就可以做接下来的事情,而无须像AllOf
那样,等待所有的CompletableFuture
结束。但由于每个CompletableFuture
的返回值类型都可能不同,任意一个,意味着无法判断是什么类型,所以anyOf
的返回值是CompletableFuture<Object>
。
package com.rubin.concurrent.completablefuture;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo9 {
private static final Random RANDOM = new Random();
private static AtomicInteger result = new AtomicInteger(0);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture[] futures = new CompletableFuture[10];
for (int i = 0; i < 10; i++) {
CompletableFuture<Void> myFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000 + RANDOM.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
result.incrementAndGet();
}
});
futures[i] = myFuture;
}
// for (int i = 0; i < 10; i++) {
// futures[i].get();
// System.out.println(result.get());
// }
// CompletableFuture<Void> future = CompletableFuture.allOf(futures).thenRun(new Runnable() {
// @Override
// public void run() {
// System.out.println("计算完成");
// }
// });
//
// future.get();
// System.out.println(result);
CompletableFuture myfuture = CompletableFuture.anyOf(futures).thenRun(new Runnable() {
@Override
public void run() {
System.out.println(result.get());
}
});
myfuture.get();
// 也可以调用CompletableFuture.anyOf(futures).join();或者CompletableFuture.allOf(futures).join();来阻塞
}
}
四种任务原型
通过上面的例子可以总结出,提交给CompletableFuture
执行的任务有四种类型:Runnable
、Consumer
、Supplier
、Function
。下面是这四种任务原型的对比。
四种任务原型 | 无参数 | 有参数 |
无返回值 | Runnable 接口对应的提交方法: runAsync ,thenRun | Consumer 接口对应的提交方法: thenAccept |
有返回值 | Supplier 接口:对应的提交方法: supplierAsync | Function 接口对应的提交方法: thenApply |
runAsync
与supplierAsync
是CompletableFuture
的静态方法;而thenAccept
、thenAsync
、thenApply
是CompletableFutre
的成员方法。
因为初始的时候没有CompletableFuture
对象,也没有参数可传,所以提交的只能是Runnable
或者Supplier
,只能是静态方法。
通过静态方法生成CompletableFuture
对象之后,便可以链式地提交其他任务了,这个时候就可以提交Runnable
、Consumer
、Function
,且都是成员方法。
CompletionStage接口
CompletableFuture
不仅实现了Future
接口,还实现了CompletableStage
接口。
CompletionStage
接口定义的正是前面的各种链式方法、组合方法,如下所示:
package java.util.concurrent;
public interface CompletionStage<T> {
//
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn);
public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
// ...
}
关于CompletionStage
接口,有几个关键点要说明:
- 所有方法的返回值都是
CompletionStage
类型,也就是它自己。正因为如此,才能实现如下的链式调用:future1.thenApply(…).thenApply(…).thenCompose(…).thenRun(…)
thenApply
接收的是一个有输入参数、返回值的Function
。这个Function
的输入参数,必须是?Super T
类型,也就是T
或者T
的父类型,而T
必须是调用thenApplycompletableFuture
对象的类型;返回值则必须是?Extends U
类型,也就是U
或者U
的子类型,而U
恰好是thenApply
的返回值的CompletionStage
对应的类型
其他方法,诸如thenCompose
、thenCombine
也是类似的原理。
CompletableFuture内部原理
CompletableFuture的构造:ForkJoinPool
CompletableFuture
中任务的执行依靠ForkJoinPool
:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
// Supplier转换为ForkJoinTask
e.execute(new AsyncSupply<U>(d, f));
return d;
}
// ...
}
通过上面的代码可以看到,asyncPool
是一个static
类型,supplierAsync
、asyncSupplyStage
也都是static
方法。static
方法会返回一个CompletableFuture
类型对象,之后就可以链式调用,CompletionStage
里面的各个方法。
任务类型的适配
ForkJoinPool
接受的任务是ForkJoinTask
类型,而我们向CompletableFuture
提交的任务是Runnable/Supplier/Consumer/Function
。因此,肯定需要一个适配机制,把这四种类型的任务转换成ForkJoinTask
,然后提交给ForkJoinPool
,如下图所示:
为了完成这种转换,在CompletableFuture
内部定义了一系列的内部类,下图是CompletableFuture
的各种内部类的继承体系。
在supplyAsync(…)
方法内部,会把一个Supplier
转换成一个AsyncSupply
,然后提交给ForkJoinPool
执行。
在runAsync(…)
方法内部,会把一个Runnable
转换成一个AsyncRun
,然后提交给ForkJoinPool
执行。
在thenRun/thenAccept/thenApply
内部,会分别把Runnable/Consumer/Function
转换成UniRun/UniAccept/UniApply
对象,然后提交给ForkJoinPool
执行
除此之外,还有两种CompletableFuture
组合的情况,分为“与”和“或”,所以有对应的Bi
和Or
类型的Completion
类型。
下面的代码分别为UniRun
、UniApply
、UniAccept
的定义,可以看到,其内部分别封装了Runnable
、Function
、Consumer
。
任务的链式执行过程分析
下面以CompletableFuture.supplyAsync(…).thenApply(…).thenRun(…)
链式代码为例,分析整个执行过程。
第1步:CompletableFuture future1=CompletableFuture.supplyAsync(…)
在上面的代码中,关键是构造了一个AsyncSupply
对象,该对象有三个关键点:
- 它继承自
ForkJoinTask
,所以能够提交ForkJoinPool
来执行 - 它封装了
Supplier f
,即它所执行任务的具体内容 - 该任务的返回值,即
CompletableFuture d
,也被封装在里面
ForkJoinPool
执行一个ForkJoinTask
类型的任务,即AsyncSupply
。该任务的输入就是Supply
,输出结果存放在CompletableFuture
中。
第2步:CompletableFuture future2=future1.thenApply(…)
第1步的返回值,也就是上面代码中的CompletableFuture d
,紧接着调用其成员方法thenApply
:
我们知道,必须等第1步的任务执行完毕,第2步的任务才可以执行。因此,这里提交的任务不可能立即执行,在此处构建了一个UniApply
对象,也就是一个ForkJoinTask
类型的任务,这个任务放入了第1个任务的栈当中。
每一个CompletableFuture
对象内部都有一个栈,存储着是后续依赖它的任务,如下面代码所示。这个栈也就是Treiber Stack
,这里的stack
存储的就是栈顶指针。
上面的UniApply
对象类似于第1步里面的AsyncSupply
,它的构造方法传入了4个参数:
- 第1个参数是执行它的
ForkJoinPool
- 第2个参数是输出一个
CompletableFuture
对象。这个参数,也是thenApply
方法的返回值,用来链式执行下一个任务 - 第3个参数是其依赖的前置任务,也就是第1步里面提交的任务
- 第4个参数是输入(也就是一个
Function
对象)
UniApply
对象被放入了第1步的CompletableFuture
的栈中,在第1步的任务执行完成之后,就会从栈中弹出并执行。如下代码:
ForkJoinPool
执行上面的AsyncSupply
对象的run()
方法,实质就是执行Supplier
的get()
方法。执行结果被塞入了 CompletableFuture d
当中,也就是赋值给了CompletableFuture
内部的Object result
变量。
调用d.postComplete()
,也正是在这个方法里面,把第2步压入的UniApply
对象弹出来执行,代码如下所示:
第3步:CompletableFuture future3=future2.thenRun()
第3步和第2步的过程类似,构建了一个UniRun
对象,这个对象被压入第2步的CompletableFuture
所在的栈中。第2步的任务,当执行完成时,从自己的栈中弹出UniRun
对象并执行。
综上所述:
通过supplyAsync/thenApply/thenRun
,分别提交了3个任务,每1个任务都有1个返回值对象,也就是1个CompletableFuture
。这3个任务通过2个CompletableFuture
完成串联。后1个任务,被放入了前1个任务的CompletableFuture
里面,前1个任务在执行完成时,会从自己的栈中,弹出下1个任务执行。如此向后传递,完成任务的链式执行。
thenApply与thenApplyAsync的区别
在上面的代码中,我们分析了thenApply
,还有一个与之对应的方法是thenApplyAsync
。这两个方法调用的是同一个方法,只不过传入的参数不同。
对于上一个任务已经得出结果的情况:
如果e != null
表示是thenApplyAsync
,需要调用ForkJoinPool
的execute
方法,该方法:
通过上面的代码可以看到:
- 如果前置任务没有完成,即
a.result=null
,thenApply
和thenApplyAsync
都会将当前任务的下一个任务入栈;然后再出栈执行 - 只有在当前任务已经完成的情况下,
thenApply
才会立即执行,不会入栈,再出栈,不会交给ForkJoinPool
;thenApplyAsync
还是将下一个任务封装为ForkJoinTask
,入栈,之后出栈再执行
同理,thenRun
与thenRunAsync
、thenAccept
与thenAcceptAsync
的区别与此类似。
任务的网状执行:有向无环图
如果任务只是链式执行,便不需要在每个CompletableFuture
里面设1个栈了,用1个指针使所有任务组成链表即可。
但实际上,任务不只是链式执行,而是网状执行,组成1张图。如下图所示,所有任务组成一个有向无环图:
这样一个有向无环图,用什么样的数据结构表达呢?AND
和OR
的关系又如何表达呢?
有几个关键点:
- 在每个任务的返回值里面,存储了依赖它的接下来要执行的任务。所以在上图中,任务一的
CompletableFuture
的栈中存储了任务二、任务三;任务二的CompletableFuutre
中存储了任务四、任务五;任务三的CompletableFuture
中存储了任务五、任务六。即每个任务的CompletableFuture
对象的栈里面,其实存储了该节点的出边对应的任务集合 - 任务二、任务三的
CompletableFuture
里面,都存储了任务五,那么任务五是不是会被触发两次,执行两次呢?任务五的确会被触发二次,但它会判断任务二、任务三的结果是不是都完成,如果只完成其中一个,它就不会执行 - 任务七存在于任务四、任务五、任务六的
CompletableFuture
的栈里面,因此会被触发三次。但它只会执行一次,只要其中1个任务执行完成,就可以执行任务七了 - 正因为有
AND
和OR
两种不同的关系,因此对应BiApply
和OrApply
两个对象,这两个对象的构造方法几乎一样,只是在内部执行的时候,一个是AND
的逻辑,一个是OR
的逻辑
BiApply
和OrApply
都是二元操作符,也就是说,只能传入二个被依赖的任务。但上面的任务七同时依赖于任务四、任务五、任务六,这怎么处理呢?
任何一个多元操作,都能被转换为多个二元操作的叠加。如上图所示,假如任务一AND
任务二AND
任务三 ==> 任务四,那么它可以被转换为右边的形式。新建了一个AND
任务,这个AND
任务和任务三再作为参数,构造任务四。OR的关系,与此类似。
此时,thenCombine
的内部实现原理也就可以解释了。thenCombine
用于任务一、任务二执行完成,再执行任务三。
allOf内部的计算图分析
下面以allOf
方法为例,看一下有向无环计算图的内部运作过程:
上面的方法是一个递归方法,输入是一个CompletableFuture
对象的列表,输出是一个具有AND关系的复合CompletableFuture
对象。
最关键的代码如上面加注释部分所示,因为d
要等a,b
都执行完成之后才能执行,因此d
会被分别压入a,b
所在的栈中。
下图为allOf
内部的运作过程。假设allOf
的参数传入了future1
、future2
、future3
、future4
,则对应四个原始任务。
生成BiRelay1
、BiRelay2
任务,分别压入future1/future2
、future3/future4
的栈中。无论future1
或future2
完成,都会触发BiRelay1
;无论future3
或future4
完成,都会触发BiRelay2
。
生成BiRelay3
任务,压入future5/future6
的栈中,无论future5
或future6
完成,都会触发BiRelay3
任务。
BiRelay
只是一个中转任务,它本身没有任务代码,只是参照输入的两个future
是否完成。如果完成,就从自己的栈中弹出依赖它的BiRelay
任务,然后执行。
以上就是本文的全部内容了。欢迎小伙伴们积极留言交流~~~
文章评论