CompletableFuture除了能够将多个任务组合、串联、异步计算,还可以对结果进行处理,也就是消费计算结果,我们前面章节的demo,都是自己写的消费代码来消费计算结果。
本节还是使用上一节的demo来演示,我们看一下用CompletableFuture提供的消费方法如何来消费计算结果。
我们先回顾一下demo的业务逻辑,我们要实现一个电商比价系统,从多个电商平台获取商品价格和折扣信息,商品价格获取和折扣获取是两个接口,依次进行了以下三个步骤的计算:
1、从远程电商获取商品价格
2、基于获取的价格创建新的VO,目的是构建符合折扣获取接口的入参对象
3、远程获取折扣,兵基于商品折扣计算最终价格
/**
* 使用CompletableFuture异步编程 消费计算结果
* @author www.itzhimei.com
*/
public class FutureTest_7 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Runtime.getRuntime().availableProcessors());
List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
new FutureShop("TIANMAO"),
new FutureShop("PDD"),
new FutureShop("TAOBAO"),
new FutureShop("JD2"),
new FutureShop("TIANMAO2"),
new FutureShop("PDD2"),
new FutureShop("TAOBAO2"));
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
long start5 = System.currentTimeMillis();
List<CompletableFuture<String>> CFcollect6 = shops.stream()
.map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor)
.thenCombine(CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscount(), executor), (price, dis) -> {
String[] split = price.split(":");
return split[0] + "获取商品价格:" + (Integer.parseInt(split[1]) * dis);
})
).collect(Collectors.toList());
//List<String> collect5 = CFcollect6.stream().map(CompletableFuture::join).collect(Collectors.toList());
//System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");
CompletableFuture[] completableFutures = CFcollect6.stream().map(x -> x.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(completableFutures).join();
System.out.println("all task completed");
long end5 = System.currentTimeMillis();
System.out.println("CompletableFuture商品计算时间:" + (end5-start5));
}
/**
* 从远程商店获取价格
*/
public static class FutureShop {
public FutureShop(String product) {
this.product = product;
}
private String product;
@SneakyThrows
public String getProductPrice() {
//模拟计算复杂逻辑
Thread.sleep(1000);
return product+"获取商品价格:" + new Random().nextInt(1000);
}
public String getProduct() {
return product;
}
public void setProduct(String product) {
this.product = product;
}
}
/**
* 按照远程接口要求定义接口请求VO
*/
public static class FutureDiscountPriceVO {
private String shopName;
private Integer price;
public FutureDiscountPriceVO(String shopName, Integer price) {
this.shopName = shopName;
this.price = price;
}
public static FutureDiscountPriceVO getRemoteVO(String price) {
String[] split = price.split(":");
return new FutureDiscountPriceVO(split[0], Integer.parseInt(split[1]));
}
public Integer getPrice() {
return price;
}
public void setPrice(Integer price) {
this.price = price;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
}
/**
* 远程商店折扣获取服务
*/
public static class FutureDiscountService {
private static double[] disArr = {0.9,0.8,0.7,0.6,0.5};
@SneakyThrows
public static String getDiscountPrice(FutureDiscountPriceVO vo) {
//模拟远程请求耗时
Thread.sleep(1000);
int index = new Random().nextInt(4);
Double price = vo.getPrice() * disArr[index];
return vo.getShopName() +"获取商品价格:" +price.toString();
}
@SneakyThrows
public static Double getDiscount() {
//模拟远程请求耗时
Thread.sleep(1000);
int index = new Random().nextInt(4);
return disArr[index];
}
}
}
/* 输出
JD获取商品价格获取商品价格:179.2
PDD获取商品价格获取商品价格:495.59999999999997
TIANMAO获取商品价格获取商品价格:175.79999999999998
TAOBAO获取商品价格获取商品价格:323.1
JD2获取商品价格获取商品价格:118.8
TIANMAO2获取商品价格获取商品价格:82.4
PDD2获取商品价格获取商品价格:24.0
TAOBAO2获取商品价格获取商品价格:98.69999999999999
all task completed
CompletableFuture商品计算时间:2157
*/
我们重点看这里:
//List<String> collect5 = CFcollect6.stream().map(CompletableFuture::join).collect(Collectors.toList());
//System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");
CompletableFuture[] completableFutures = CFcollect6.stream().map(x -> x.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(completableFutures).join();
System.out.println("all task completed");
这里列出了上一节的计算结果消费方式(注释掉的代码)和本节使用CompletableFuture提供的方法的消费方法。
CompletableFuture提供的消费方法:thenAccept,这个方法等待前一个任务计算结果,然后用作自身的计算参数,返回类型是CompletableFuture。
但是不要觉得这是没有返回类型,这是有返回类型的只不过是CompletableFuture包装的Void类型。
我们的代码使用了thenAccept方法消费结果,依然将所有的计算任务join到了当前的主线程中,让主线程等待所有任务计算完成,输出一个日志”all task completed”。
一共提供了3种消费方法,并且每种方法都提供了对应的异步方法:
thenAccept():对单个结果进行消费,使用前一个计算结果作为参数
thenAcceptBoth():对两个结果进行消费,,使用前一个计算结果和第二个定义的计算任务,作为参数,定义一个BiConsumer作为结果处理逻辑并返回结果
thenRun():不关心结果,只要前一个任务执行完成,则执行Action,不需要前一个计算结果作为参数
可以根据业务场景进行选择对应的方法。