我们已经初步了解了CompletableFuture的功能,以及在使用CompletableFuture时如何处理异常,我们前面的章节除了提到CompletableFuture可以实现多个异步计算的关联计算,我们本节来看看CompletableFuture的性能。
demo场景:双十一马上要到了,我们要买某个商品,买这个商品前,为了买到最优惠的商品,我们货比多家,我们去多个电商平台查看这个商品的价格,找到最便宜的那一个。
看代码:
/**
* 使用CompletableFuture异步编程 性能
* @author www.itzhimei.com
*/
public class FutureTest_4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Runtime.getRuntime().availableProcessors());
List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
new FutureShop("TIANMAO"),
new FutureShop("PDD"),
new FutureShop("TAOBAO"));
long start = System.currentTimeMillis();
List<String> collect = shops.stream().map(x -> x.getProductPrice()).collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.println("商品价格计算结果[" + collect.stream().collect(Collectors.joining(",")) + "]");
System.out.println("Stream商品计算时间:" + (end-start));
long start2 = System.currentTimeMillis();
List<String> collect2 = shops.stream().parallel().map(x -> x.getProductPrice()).collect(Collectors.toList());
long end2 = System.currentTimeMillis();
System.out.println("商品价格计算结果[" + collect2.stream().collect(Collectors.joining(",")) + "]");
System.out.println("Stream parallel商品计算时间:" + (end2-start2));
long start3 = System.currentTimeMillis();
List<CompletableFuture<String>> CFcollect3 = shops.stream().map(x -> CompletableFuture.supplyAsync(
() -> x.getProductPrice()
)
).collect(Collectors.toList());
List<String> collect3 = CFcollect3.stream().map(CompletableFuture::join).collect(Collectors.toList());
long end3 = System.currentTimeMillis();
System.out.println("商品价格计算结果[" + collect3.stream().collect(Collectors.joining(",")) + "]");
System.out.println("CompletableFuture商品计算时间:" + (end3-start3));
long start4 = System.currentTimeMillis();
List<String> collect4 = shops.stream().map(x -> CompletableFuture.supplyAsync(
() -> x.getProductPrice()
)
).map(CompletableFuture::join).collect(Collectors.toList());
long end4 = System.currentTimeMillis();
System.out.println("商品价格计算结果[" + collect4.stream().collect(Collectors.joining(",")) + "]");
System.out.println("CompletableFuture串行商品计算时间:" + (end4-start4));
}
public static class FutureShop {
public FutureShop(String product) {
this.product = product;
}
private String product;
@SneakyThrows
public String getProductPrice() {
//模拟计算复杂逻辑
Thread.sleep(1000);
return product+"获取商品价格:" + new Random().nextInt(1000);
}
public String getProduct() {
return product;
}
public void setProduct(String product) {
this.product = product;
}
}
}
/* 输出
商品价格计算结果[JD获取商品价格:333,TIANMAO获取商品价格:98,PDD获取商品价格:898,TAOBAO获取商品价格:789]
Stream商品计算时间:4085
商品价格计算结果[JD获取商品价格:605,TIANMAO获取商品价格:978,PDD获取商品价格:967,TAOBAO获取商品价格:952]
Stream parallel商品计算时间:1236
商品价格计算结果[JD获取商品价格:368,TIANMAO获取商品价格:586,PDD获取商品价格:809,TAOBAO获取商品价格:730]
CompletableFuture商品计算时间:2013
商品价格计算结果[JD获取商品价格:318,TIANMAO获取商品价格:939,PDD获取商品价格:403,TAOBAO获取商品价格:144]
CompletableFuture串行商品计算时间:4003
*/
代码中对比了4种写法:
第一种是stream,用时:4085
第二种是stream.parallel,也就是stream的并发模式,用时:1236
第三种是CompletableFutureb并发模式:2013
第四种是CompletableFuture的串行模式,用时:4003
首先要分析的是:为什么第三和第四的时间不一样,第三种模式是分成两个lambda表达式来计算的,第四种是一个lambda来计算的。
因为lambda的计算是流式执行,也就是第四种模式执行的时候是每一个数据都从前到后执行完,才执行下一个数据,也就是shops.stream().map-> CompletableFuture.supplyAsync->map(CompletableFuture::join),一个执行完,再执行下一个,所以就没有实现并发计算。
而第三种方式是几条数据并行获取价格,获取一个新的结果,然后新结果再计算。
其次要分析的是:为什么CompletableFuture不如Stream parallel性能好?
这个问题需要修改一下代码,我们把对比商城平台从4个增加到8个,代码:
List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
new FutureShop("TIANMAO"),
new FutureShop("PDD"),
new FutureShop("TAOBAO"),
new FutureShop("JD2"),
new FutureShop("TIANMAO2"),
new FutureShop("PDD2"),
new FutureShop("TAOBAO2"));
/* 输出
商品价格计算结果[JD获取商品价格:742,TIANMAO获取商品价格:191,PDD获取商品价格:833,TAOBAO获取商品价格:347,JD2获取商品价格:610,TIANMAO2获取商品价格:952,PDD2获取商品价格:542,TAOBAO2获取商品价格:281]
Stream商品计算时间:8064
商品价格计算结果[JD获取商品价格:66,TIANMAO获取商品价格:97,PDD获取商品价格:77,TAOBAO获取商品价格:881,JD2获取商品价格:74,TIANMAO2获取商品价格:154,PDD2获取商品价格:594,TAOBAO2获取商品价格:160]
Stream parallel商品计算时间:2227
商品价格计算结果[JD获取商品价格:20,TIANMAO获取商品价格:948,PDD获取商品价格:308,TAOBAO获取商品价格:246,JD2获取商品价格:958,TIANMAO2获取商品价格:192,PDD2获取商品价格:194,TAOBAO2获取商品价格:307]
CompletableFuture商品计算时间:3015
商品价格计算结果[JD获取商品价格:755,TIANMAO获取商品价格:868,PDD获取商品价格:437,TAOBAO获取商品价格:29,JD2获取商品价格:612,TIANMAO2获取商品价格:576,PDD2获取商品价格:395,TAOBAO2获取商品价格:902]
CompletableFuture串行商品计算时间:8002
*/
我们可以看到Stream parallel还是比CompletableFuture快一点,你多执行几次代码,二者的时间或许几乎相等,之所以是这样的结果,因为二者其实都是相同的原理,内部都是使用线程池来进行并发计算,并且使用的线程池的线程数量默认也是一样的,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值,其实就是你CPU的核数。
既然这样,那我们为什么还要使用CompletableFuture呢?因为CompletableFuture更能强大,因为CompletableFuture支持自定义线程池。
我们看源码supplyAsync方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
if (supplier == null) throw new NullPointerException();
CompletableFuture<U> f = new CompletableFuture<U>();
execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
return f;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
if (executor == null || supplier == null)
throw new NullPointerException();
CompletableFuture<U> f = new CompletableFuture<U>();
execAsync(executor, new AsyncSupply<U>(supplier, f));
return f;
}
那么对上面的代码改造一下:
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
long start5 = System.currentTimeMillis();
List<CompletableFuture<String>> CFcollect5 = shops.stream().map(x -> CompletableFuture.supplyAsync(
() -> x.getProductPrice(), executor
)
).collect(Collectors.toList());
List<String> collect5 = CFcollect5.stream().map(CompletableFuture::join).collect(Collectors.toList());
long end5 = System.currentTimeMillis();
System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");
System.out.println("CompletableFuture商品计算时间:" + (end5-start5));
/* 输出
商品价格计算结果[JD获取商品价格:979,TIANMAO获取商品价格:951,PDD获取商品价格:432,TAOBAO获取商品价格:705,JD2获取商品价格:996,TIANMAO2获取商品价格:926,PDD2获取商品价格:734,TAOBAO2获取商品价格:303]
CompletableFuture商品计算时间:1006
*/
从结果看出性能提升明显,这就是CompletableFuture的又一个强大特点,能够自定义线程池来并发计算。
那么并行计算我们是选择CompletableFuture还是Stream parallel呢?
如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream,如果除了计算还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture会更好。