CompletableFuture的强大之处就在于相比Future,其具备基于多个计算步骤结果串联计算,比如依赖计算,and计算、or计算等,也就是后一个计算可以基于前一个计算结果来计算,也可以和前一个计算结果组合,CompletableFuture提供了丰富的方法,我们通过demo先来理解CompletableFuture的强大功能。
这次的demo相比前一章节的demo稍微复杂一点,我们加入了一个折扣获取的逻辑,商品折扣也是从远程服务获取的,总结下来demo一共做了3件事情:
1、从远程电商获取商品价格-FutureShop
2、基于获取的价格创建新的VO,目的是构建符合折扣获取接口的入参对象-FutureDiscountPriceVO
3、远程获取折扣,兵基于商品折扣计算最终价格-FutureDiscountService
了解了demo的实现逻辑,我们来看代码:
/**
* 使用CompletableFuture异步编程 串联计算
* @author www.itzhimei.com
*/
public class FutureTest_5 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Runtime.getRuntime().availableProcessors());
List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
new FutureShop("TIANMAO"),
new FutureShop("PDD"),
new FutureShop("TAOBAO"),
new FutureShop("JD2"),
new FutureShop("TIANMAO2"),
new FutureShop("PDD2"),
new FutureShop("TAOBAO2"));
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
long start5 = System.currentTimeMillis();
List<CompletableFuture<String>> CFcollect5 = shops.stream()
.map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor))
.map(y -> y.thenApply(FutureDiscountPriceVO::getRemoteVO))
.map(z -> z.thenCompose(dp -> CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscountPrice(dp), executor)))
.collect(Collectors.toList());
List<String> collect5 = CFcollect5.stream().map(CompletableFuture::join).collect(Collectors.toList());
long end5 = System.currentTimeMillis();
System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");
System.out.println("CompletableFuture商品计算时间:" + (end5-start5));
}
/**
* 从远程商店获取价格
*/
public static class FutureShop {
public FutureShop(String product) {
this.product = product;
}
private String product;
@SneakyThrows
public String getProductPrice() {
//模拟计算复杂逻辑
Thread.sleep(1000);
return product+"获取商品价格:" + new Random().nextInt(1000);
}
public String getProduct() {
return product;
}
public void setProduct(String product) {
this.product = product;
}
}
/**
* 按照远程接口要求定义接口请求VO
*/
public static class FutureDiscountPriceVO {
private String shopName;
private Integer price;
public FutureDiscountPriceVO(String shopName, Integer price) {
this.shopName = shopName;
this.price = price;
}
public static FutureDiscountPriceVO getRemoteVO(String price) {
String[] split = price.split(":");
return new FutureDiscountPriceVO(split[0], Integer.parseInt(split[1]));
}
public Integer getPrice() {
return price;
}
public void setPrice(Integer price) {
this.price = price;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
}
/**
* 远程商店折扣获取服务
*/
public static class FutureDiscountService {
private static double[] disArr = {0.9,0.8,0.7,0.6,0.5};
@SneakyThrows
public static String getDiscountPrice(FutureDiscountPriceVO vo) {
//模拟远程请求耗时
Thread.sleep(1000);
int index = new Random().nextInt(4);
Double price = vo.getPrice() * disArr[index];
return vo.getShopName() +"获取商品价格:" +price.toString();
}
}
}
/* 输出
商品价格计算结果
[
JD获取商品价格获取商品价格:794.7,
TIANMAO获取商品价格获取商品价格:577.1999999999999,
PDD获取商品价格获取商品价格:304.2,
TAOBAO获取商品价格获取商品价格:126.4,
JD2获取商品价格获取商品价格:621.6,
TIANMAO2获取商品价格获取商品价格:553.8,
PDD2获取商品价格获取商品价格:473.6,
TAOBAO2获取商品价格获取商品价格:25.9
]
CompletableFuture商品计算时间:2177
*/
上面demo演示的核心代码是:
List<CompletableFuture<String>> CFcollect5 = shops.stream()
.map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor))
.map(y -> y.thenApply(FutureDiscountPriceVO::getRemoteVO))
.map(z -> z.thenCompose(dp -> CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscountPrice(dp), executor)))
.collect(Collectors.toList());
这里你首次看到了的CompletableFuture的thenApply和thenCompose方法,supplyAsync方法我们前一章节已经演示过了。
第一步通过map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor))已经从远程获取到了商品的标准价格,并且这一步是异步且是多线程并发获取的结果。
本地有了价格以后,第二步map(y -> y.thenApply(FutureDiscountPriceVO::getRemoteVO))则是解析价格,创建新的VO对象,thenApply()的用就是把前面任务的执行结果,交给后面的Function计算。
组装好了下一步的参数,那么就是请求远程接口并计算最终价格了,map(z -> z.thenCompose(dp -> CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscountPrice(dp), executor)))。
第三步这里使用了thenCompose,这个的作用是连接两个有依赖关系的任务,thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作,结果由第二个任务返回,我们可以看到thenCompose方法的参数又是CompletableFuture类型,也就是可以继续使用一个异步线程池进行复杂计算。
thenApply和thenCompose方法都是有依赖关系的计算方法,其计算用的数据都是基于前一步数据来的。