Lambda从入门到精通之三十二 CompletableFuture异步编程 消费计算结果

CompletableFuture除了能够将多个任务组合、串联、异步计算,还可以对结果进行处理,也就是消费计算结果,我们前面章节的demo,都是自己写的消费代码来消费计算结果。
本节还是使用上一节的demo来演示,我们看一下用CompletableFuture提供的消费方法如何来消费计算结果。

我们先回顾一下demo的业务逻辑,我们要实现一个电商比价系统,从多个电商平台获取商品价格和折扣信息,商品价格获取和折扣获取是两个接口,依次进行了以下三个步骤的计算:
1、从远程电商获取商品价格
2、基于获取的价格创建新的VO,目的是构建符合折扣获取接口的入参对象
3、远程获取折扣,兵基于商品折扣计算最终价格

/**
 * 使用CompletableFuture异步编程 消费计算结果
 * @author www.itzhimei.com
 */
public class FutureTest_7 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Runtime.getRuntime().availableProcessors());

        List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
                new FutureShop("TIANMAO"),
                new FutureShop("PDD"),
                new FutureShop("TAOBAO"),
                new FutureShop("JD2"),
                new FutureShop("TIANMAO2"),
                new FutureShop("PDD2"),
                new FutureShop("TAOBAO2"));


        Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                        new ThreadFactory() {
                            public Thread newThread(Runnable r) {
                                Thread t = new Thread(r);
                                t.setDaemon(true);
                                return t;
                            }
                        });
        long start5 = System.currentTimeMillis();

        List<CompletableFuture<String>> CFcollect6 = shops.stream()
                .map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor)
                        .thenCombine(CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscount(), executor), (price, dis) -> {
                            String[] split = price.split(":");
                            return split[0] + "获取商品价格:" + (Integer.parseInt(split[1]) * dis);
                        })
                ).collect(Collectors.toList());

        //List<String> collect5 = CFcollect6.stream().map(CompletableFuture::join).collect(Collectors.toList());
        //System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");

        CompletableFuture[] completableFutures = CFcollect6.stream().map(x -> x.thenAccept(System.out::println))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(completableFutures).join();
        System.out.println("all task completed");


        long end5 = System.currentTimeMillis();
        System.out.println("CompletableFuture商品计算时间:" + (end5-start5));


    }

    /**
     * 从远程商店获取价格
     */
    public static class FutureShop {

        public FutureShop(String product) {
            this.product = product;
        }

        private String product;

        @SneakyThrows
        public String getProductPrice() {
            //模拟计算复杂逻辑
            Thread.sleep(1000);
            return product+"获取商品价格:"  + new Random().nextInt(1000);
        }

        public String getProduct() {
            return product;
        }

        public void setProduct(String product) {
            this.product = product;
        }
    }

    /**
     * 按照远程接口要求定义接口请求VO
     */
    public static class FutureDiscountPriceVO {

        private String shopName;
        private Integer price;

        public FutureDiscountPriceVO(String shopName, Integer price) {
            this.shopName = shopName;
            this.price = price;
        }

        public static FutureDiscountPriceVO getRemoteVO(String price) {
            String[] split = price.split(":");
            return new FutureDiscountPriceVO(split[0], Integer.parseInt(split[1]));
        }

        public Integer getPrice() {
            return price;
        }

        public void setPrice(Integer price) {
            this.price = price;
        }

        public String getShopName() {
            return shopName;
        }

        public void setShopName(String shopName) {
            this.shopName = shopName;
        }
    }


    /**
     * 远程商店折扣获取服务
     */
    public static class FutureDiscountService {

        private static double[] disArr = {0.9,0.8,0.7,0.6,0.5};

        @SneakyThrows
        public static String getDiscountPrice(FutureDiscountPriceVO vo) {
            //模拟远程请求耗时
            Thread.sleep(1000);

            int index = new Random().nextInt(4);
            Double price = vo.getPrice() * disArr[index];
            return  vo.getShopName() +"获取商品价格:"  +price.toString();
        }

        @SneakyThrows
        public static Double getDiscount() {
            //模拟远程请求耗时
            Thread.sleep(1000);

            int index = new Random().nextInt(4);
            return  disArr[index];
        }
    }

}
/* 输出
JD获取商品价格获取商品价格:179.2
PDD获取商品价格获取商品价格:495.59999999999997
TIANMAO获取商品价格获取商品价格:175.79999999999998
TAOBAO获取商品价格获取商品价格:323.1
JD2获取商品价格获取商品价格:118.8
TIANMAO2获取商品价格获取商品价格:82.4
PDD2获取商品价格获取商品价格:24.0
TAOBAO2获取商品价格获取商品价格:98.69999999999999
all task completed
CompletableFuture商品计算时间:2157
 */

我们重点看这里:

//List<String> collect5 = CFcollect6.stream().map(CompletableFuture::join).collect(Collectors.toList());
//System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");

CompletableFuture[] completableFutures = CFcollect6.stream().map(x -> x.thenAccept(System.out::println))
		.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(completableFutures).join();
System.out.println("all task completed"); 

这里列出了上一节的计算结果消费方式(注释掉的代码)和本节使用CompletableFuture提供的方法的消费方法。
CompletableFuture提供的消费方法:thenAccept,这个方法等待前一个任务计算结果,然后用作自身的计算参数,返回类型是CompletableFuture。
但是不要觉得这是没有返回类型,这是有返回类型的只不过是CompletableFuture包装的Void类型。
我们的代码使用了thenAccept方法消费结果,依然将所有的计算任务join到了当前的主线程中,让主线程等待所有任务计算完成,输出一个日志”all task completed”。

一共提供了3种消费方法,并且每种方法都提供了对应的异步方法:
thenAccept():对单个结果进行消费,使用前一个计算结果作为参数
thenAcceptBoth():对两个结果进行消费,,使用前一个计算结果和第二个定义的计算任务,作为参数,定义一个BiConsumer作为结果处理逻辑并返回结果
thenRun():不关心结果,只要前一个任务执行完成,则执行Action,不需要前一个计算结果作为参数
可以根据业务场景进行选择对应的方法。