CompletableFuture异步编程的并行计算?一听到这个你或许会比较疑惑,CompletableFuture本来不就是异步并行计算吗?是的没错,但我们这里所说的并行,是指多个CompletableFuture计算结果有关联关系,并且可以同时进行的场景。
我们上一节讲解的CompletableFuture串联计算,是多个CompletableFuture计算时后一个依赖于前一个计算的结果,方法:thenApply和thenCompose。
而本节所说的CompletableFuture并行计算,则是多个CompletableFuture计算任务同时进行,等多个任务都计算结束后,然后把结果组合到一起返回,方法:thenCombine。
我们本节demo,对上一节演示thenApply和thenCompose方法的demo进行改进,我们先回顾一下上一节demo的业务逻辑。
我们要实现一个电商比价系统,从多个电商平台获取商品价格和折扣信息,依次进行了以下三个步骤的计算:
1、从远程电商获取商品价格
2、基于获取的价格创建新的VO,目的是构建符合折扣获取接口的入参对象
3、远程获取折扣,兵基于商品折扣计算最终价格
而我们本节,就进行一些小小的改造,第一步和第三步的商品价格获取和商品折扣获取其实就可以并行,各自获取到数据后,然后进行第二步的最终结果计算。
看代码:
/**
* 使用CompletableFuture异步编程 并行计算
* @author www.itzhimei.com
*/
public class FutureTest_6 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Runtime.getRuntime().availableProcessors());
List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
new FutureShop("TIANMAO"),
new FutureShop("PDD"),
new FutureShop("TAOBAO"),
new FutureShop("JD2"),
new FutureShop("TIANMAO2"),
new FutureShop("PDD2"),
new FutureShop("TAOBAO2"));
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
long start5 = System.currentTimeMillis();
List<CompletableFuture<String>> CFcollect6 = shops.stream()
.map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor)
.thenCombine(CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscount(), executor), (price, dis) -> {
String[] split = price.split(":");
return split[0] + "获取商品价格:" + (Integer.parseInt(split[1]) * dis);
})
).collect(Collectors.toList());
List<String> collect5 = CFcollect6.stream().map(CompletableFuture::join).collect(Collectors.toList());
long end5 = System.currentTimeMillis();
System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");
System.out.println("CompletableFuture商品计算时间:" + (end5-start5));
}
/**
* 从远程商店获取价格
*/
public static class FutureShop {
public FutureShop(String product) {
this.product = product;
}
private String product;
@SneakyThrows
public String getProductPrice() {
//模拟计算复杂逻辑
Thread.sleep(1000);
return product+"获取商品价格:" + new Random().nextInt(1000);
}
public String getProduct() {
return product;
}
public void setProduct(String product) {
this.product = product;
}
}
/**
* 按照远程接口要求定义接口请求VO
*/
public static class FutureDiscountPriceVO {
private String shopName;
private Integer price;
public FutureDiscountPriceVO(String shopName, Integer price) {
this.shopName = shopName;
this.price = price;
}
public static FutureDiscountPriceVO getRemoteVO(String price) {
String[] split = price.split(":");
return new FutureDiscountPriceVO(split[0], Integer.parseInt(split[1]));
}
public Integer getPrice() {
return price;
}
public void setPrice(Integer price) {
this.price = price;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
}
/**
* 远程商店折扣获取服务
*/
public static class FutureDiscountService {
private static double[] disArr = {0.9,0.8,0.7,0.6,0.5};
@SneakyThrows
public static String getDiscountPrice(FutureDiscountPriceVO vo) {
//模拟远程请求耗时
Thread.sleep(1000);
int index = new Random().nextInt(4);
Double price = vo.getPrice() * disArr[index];
return vo.getShopName() +"获取商品价格:" +price.toString();
}
@SneakyThrows
public static Double getDiscount() {
//模拟远程请求耗时
Thread.sleep(1000);
int index = new Random().nextInt(4);
return disArr[index];
}
}
}
/* 输出
商品价格计算结果[
JD获取商品价格获取商品价格:413.4,
TIANMAO获取商品价格获取商品价格:409.2,
PDD获取商品价格获取商品价格:357.0,
TAOBAO获取商品价格获取商品价格:721.8000000000001,
JD2获取商品价格获取商品价格:588.6,
TIANMAO2获取商品价格获取商品价格:189.0,
PDD2获取商品价格获取商品价格:452.4,
TAOBAO2获取商品价格获取商品价格:287.7
]
CompletableFuture商品计算时间:2131
*/
核心代码:
List<CompletableFuture<String>> CFcollect6 = shops.stream()
.map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor)
.thenCombine(CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscount(), executor), (price, dis) -> {
String[] split = price.split(":");
return split[0] + "获取商品价格:" + (Integer.parseInt(split[1]) * dis);
})
).collect(Collectors.toList());
这里使用了thenCombine方法,将价格获取和折扣获取进行了组合,组合方法是thenCombine方法的第二个参数,我们用价格*折扣=最终价格。
这种使用方式也可以叫做CompletableFuture的组合计算,实现效果就是并行计算多个。