ReduceFunction能够指定两个元素如何合并生成一个元素,两个输入元素和最终输出元素的类型必须相同。
ReduceFunction是一个增量计算窗口函数。
代码示例:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
我们还是以用户交易数据来演示代码:
package org.itzhimei.window;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* Window ReduceFunction
* 每5秒统计每个用户消费总金额
*/
public class Window_3_ReduceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");
SingleOutputStreamOperator<Transaction> reduce = dataStream.keyBy(Transaction::getAccountId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Transaction>() {
@Override
public Transaction reduce(Transaction value1, Transaction value2) throws Exception {
return new Transaction(value1.getAccountId(), value1.getTimestamp(), value1.getAmount() + value2.getAmount());
}
});
reduce.print();
env.execute();
}
}
/*
11> Transaction{accountId=1, timestamp=1546272000000, amount=567.87}
16> Transaction{accountId=2, timestamp=1546272360000, amount=726.23}
15> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
16> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
15> Transaction{accountId=3, timestamp=1546274520000, amount=2263.909}
1> Transaction{accountId=4, timestamp=1546274880000, amount=1995.5800000000002}
16> Transaction{accountId=5, timestamp=1546275240000, amount=2222.97}
11> Transaction{accountId=1, timestamp=1546275600000, amount=2650.4599999999996}
16> Transaction{accountId=2, timestamp=1546275960000, amount=3134.0299999999997}
11> Transaction{accountId=1, timestamp=1546291800000, amount=2841.87}
1> Transaction{accountId=4, timestamp=1546291080000, amount=2390.69}
15> Transaction{accountId=3, timestamp=1546290720000, amount=1921.8090000000002}
16> Transaction{accountId=5, timestamp=1546291440000, amount=2139.3799999999997}
16> Transaction{accountId=2, timestamp=1546292160000, amount=3110.68}
*/