ProcessWindowFunction是一个全量计算函数,也就是窗口内数据到达后不计算,而是等到窗口出发时,将所有数据进行计算,那么这就有一个问题,就是当一个窗口数据量非常大的时候,等待窗口出发计算可能会很慢,比较耗时。前面数据到达了不计算,白白浪费了时间。
ProcessWindowFunction也支持增量计算,可以将ReduceFunction或AggregateFunction函数和ProcessWindowFunction组合使用,这就实现了窗口既能增量计算,又能获取到额外的信息。
我们来看带AggregateFunction的增量窗口聚合demo,AggregateFunction的特点是输入、计算和输出的类型可以是不同的类型。
demo需求目标是:每5秒统计每个用户消费总交易笔数,总交易金额;最终输出:Window,账号ID,总交易笔数,总交易金额。
代码:
package org.itzhimei.window;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* Window ProcessWindowFunction
* 每5秒统计每个用户消费总交易笔数,总交易金额
* 输出类型:Tuple4<String, Long, Integer, Double>
* 分别计算输出:Window,账号ID,总交易笔数,总交易金额
*/
public class Window_7_ProcessWindowFunctionWithAggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");
SingleOutputStreamOperator<Tuple4<String, Long, Integer, Double>> process = dataStream.keyBy(Transaction::getAccountId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new My2AggregateFunction(), new My2ProcessWindowFunction());
process.print();
env.execute();
}
private static class My2AggregateFunction implements AggregateFunction<Transaction, Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>> {
@Override
public Tuple3<Long, Integer, Double> createAccumulator() {
return new Tuple3<>(0L,0,0D);
}
@Override
public Tuple3<Long, Integer, Double> add(Transaction value, Tuple3<Long, Integer, Double> accumulator) {
return new Tuple3<>(value.getAccountId(),accumulator.f1+1, accumulator.f2+value.getAmount());
}
@Override
public Tuple3<Long, Integer, Double> getResult(Tuple3<Long, Integer, Double> accumulator) {
return accumulator;
}
@Override
public Tuple3<Long, Integer, Double> merge(Tuple3<Long, Integer, Double> a, Tuple3<Long, Integer, Double> b) {
return new Tuple3<>(a.f0,a.f1+b.f1, a.f2+b.f2);
}
}
private static class My2ProcessWindowFunction extends ProcessWindowFunction<Tuple3<Long, Integer, Double>, Tuple4<String, Long, Integer, Double>, Long, TimeWindow> {
@Override
public void process(Long aLong, Context context, Iterable<Tuple3<Long, Integer, Double>> elements, Collector<Tuple4<String, Long, Integer, Double>> out) throws Exception {
Tuple3<Long, Integer, Double> next = elements.iterator().next();
String s = context.window().toString();
out.collect(new Tuple4<>(s, next.f0, next.f1, next.f2));
}
}
}
/* 分别计算输出:Window,账号ID,总交易笔数,总交易金额
16> (TimeWindow{start=1646992805000, end=1646992810000},2,6,1994.6900000000003)
15> (TimeWindow{start=1646992805000, end=1646992810000},3,6,1305.839)
11> (TimeWindow{start=1646992805000, end=1646992810000},1,6,2114.09)
1> (TimeWindow{start=1646992805000, end=1646992810000},4,6,1407.2900000000002)
16> (TimeWindow{start=1646992805000, end=1646992810000},5,6,1451.54)
11> (TimeWindow{start=1646992810000, end=1646992815000},1,9,2546.19)
16> (TimeWindow{start=1646992810000, end=1646992815000},2,9,3257.2499999999995)
15> (TimeWindow{start=1646992810000, end=1646992815000},3,9,1504.909)
16> (TimeWindow{start=1646992810000, end=1646992815000},5,9,2352.3900000000003)
1> (TimeWindow{start=1646992810000, end=1646992815000},4,9,2410.14)
11> (TimeWindow{start=1646992815000, end=1646992820000},1,9,2654.66)
16> (TimeWindow{start=1646992815000, end=1646992820000},2,9,3255.29)
15> (TimeWindow{start=1646992815000, end=1646992820000},3,9,2375.259)
16> (TimeWindow{start=1646992815000, end=1646992820000},5,9,2304.27)
1> (TimeWindow{start=1646992815000, end=1646992820000},4,9,2123.44)
*/