DataStream API没有reduce和sum这类聚合操作的方法,因为Flink设计中,数据必须先分组才能做聚合操作。
所以一般操作是对DataStream做keyBy,得到KeyedStream,然后调用KeyedStream API上的reduce、sum等聚合操作方法。
Reduce在DataStream API中的作用是聚合操作,其聚合逻辑是将当前数据和上一条聚合结果数据一起给到用户,由用户自定义聚合方式。
方法API:
public interface ReduceFunction extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
这种聚合方式不同于KeyBy的sum、min、max等,Reduce的逻辑和Java Lambda中的Reduce实现原理是一样的。
代码:
package org.itzhimei.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Reduce KeyedStream → DataStream
*
*/
public class Transform_5_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.fromCollection(
Arrays.asList("hello flink",
"hello java",
"hi program",
"hello",
"java"));
// 1、分词
// 2、按每个单词作为key分组
// 3、使用reduce统计每个单词出现次数
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] s1 = s.split(" ");
for (String s2 : s1) {
collector.collect(new Tuple2<>(s2, 1));
}
}
}).keyBy(item -> item.f0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
System.out.println("---t1.f0:"+t1.f0 + ",---t2.f0:"+t2.f0);
System.out.println("---t1.f1:"+t1.f1 + ",---t2.f1:"+t2.f1);
return new Tuple2<String, Integer>(t1.f0,t1.f1+t2.f1);
}
});
reduce.print();
env.execute();
}
}
/* 输出
(hello,1)
(flink,1)
---t1.f0:hello,---t2.f0:hello
---t1.f1:1,---t2.f1:1
(hello,2)
(java,1)
(hi,1)
(program,1)
---t1.f0:hello,---t2.f0:hello
---t1.f1:2,---t2.f1:1
(hello,3)
---t1.f0:java,---t2.f0:java
---t1.f1:1,---t2.f1:1
(java,2)
*/
基于上面的代码,核心逻辑分析:
我们来演示一下,假设无限流流通过keyBy将hello的单词,分5次数据到key为hello的分组,那么5次的逻辑是:
1(hello-1) 2(hello-1) 3(hello-1) 4(hello-1) 5(hello-1)
第一次没有之前的数据,就是null+Tuple2(hello,1)进行计算
第二次基于前一次计算的结果,就是Tuple2(hello,1)+Tuple2(hello,1)进行计算
第三次基于前一次计算的结果,就是Tuple2(hello,2)+Tuple2(hello,1)进行计算
第四次基于前一次计算的结果,就是Tuple2(hello,3)+Tuple2(hello,1)进行计算
第五次基于前一次计算的结果,就是Tuple2(hello,4)+Tuple2(hello,1)进行计算
最后输出的结果是hello-5。