Flink从入门到实战五[Window]-6-ProcessWindowFunction基于RecuctFunction增量计算

ProcessWindowFunction是一个全量计算函数,也就是窗口内数据到达后不计算,而是等到窗口出发时,将所有数据进行计算,那么这就有一个问题,就是当一个窗口数据量非常大的时候,等待窗口出发计算可能会很慢,比较耗时。前面数据到达了不计算,白白浪费了时间。

ProcessWindowFunction也支持增量计算,可以将ReduceFunction或AggregateFunction函数和ProcessWindowFunction组合使用,这就实现了窗口既能增量计算,又能获取到额外的信息。

我们来看带ReduceFunction的增量窗口聚合demo,ReduceFunction的特点是输入和输出的类型必须是一致的。
demo需求目标是:每5秒统计每个用户消费总交易金额;最终输出:Window,账号ID,总交易金额。

代码:

package org.itzhimei.window;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * Window ProcessWindowFunction
 * 每5秒统计每个用户消费总交易金额
 * 输出类型:Tuple3<String, Long, Double>
 * 分别计算输出:Window,账号ID,总交易金额
 */
public class Window_6_ProcessWindowFunctionWithReductFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");

        SingleOutputStreamOperator<Tuple3<String, Long, Double>> process = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .reduce(new My1ReduceFunction(), new My1ProcessWindowFunction());

        process.print();

        env.execute();
    }

    private static class My1ReduceFunction implements ReduceFunction<Transaction> {
        @Override
        public Transaction reduce(Transaction value1, Transaction value2) throws Exception {
            return new Transaction(value1.getAccountId(), value1.getTimestamp(), value1.getAmount() + value2.getAmount());
        }
    }


    private static class My1ProcessWindowFunction extends ProcessWindowFunction<Transaction, Tuple3<String, Long, Double>, Long, TimeWindow> {
        @Override
        public void process(Long aLong, Context context, Iterable<Transaction> elements, Collector<Tuple3<String, Long, Double>> out) throws Exception {
            Transaction next = elements.iterator().next();
            String s = context.window().toString();
            out.collect(new Tuple3<>(s,next.getAccountId(), next.getAmount()));
        }
    }
}

/* 分别计算输出:Window,账号ID,总交易金额
11> (TimeWindow{start=1646991685000, end=1646991690000},1,1630.18)
15> (TimeWindow{start=1646991685000, end=1646991690000},3,434.68899999999996)
16> (TimeWindow{start=1646991685000, end=1646991690000},2,1766.4700000000003)
16> (TimeWindow{start=1646991685000, end=1646991690000},5,1244.56)
1> (TimeWindow{start=1646991685000, end=1646991690000},4,992.2100000000002)

11> (TimeWindow{start=1646991690000, end=1646991695000},1,2654.66)
1> (TimeWindow{start=1646991690000, end=1646991695000},4,2452.23)
15> (TimeWindow{start=1646991690000, end=1646991695000},3,2375.259)
16> (TimeWindow{start=1646991690000, end=1646991695000},5,2054.2799999999997)
16> (TimeWindow{start=1646991690000, end=1646991695000},2,3255.29)

11> (TimeWindow{start=1646991695000, end=1646991700000},1,2610.48)
16> (TimeWindow{start=1646991695000, end=1646991700000},5,2047.09)
15> (TimeWindow{start=1646991695000, end=1646991700000},3,2375.289)
1> (TimeWindow{start=1646991695000, end=1646991700000},4,2242.39)
16> (TimeWindow{start=1646991695000, end=1646991700000},2,3072.5600000000004)
 */