DataStream API没有reduce和sum这类聚合操作的方法,因为Flink设计中,数据必须先分组才能做聚合操作。
所以一般操作是对DataStream做keyBy,得到KeyedStream,然后调用KeyedStream API上的reduce、sum等聚合操作方法。
KeyBy逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式分区。
流转换:DataStream → KeyedStream,做完分区之后,就可以按照分区数据进行聚合,例如:
sum()
min()
max()
minBy()
maxBy()
依旧以WordCount为例计算每个单词出现的次数,代码:
package org.itzhimei.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Map DataStream → DataStream
*
*/
public class Transform_4_KeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.fromCollection(
Arrays.asList("hello flink",
"hello java",
"hi program",
"hello"));
// 1、分词
// 2、按每个单词作为key分组
// 3、统计每个单词出现次数
SingleOutputStreamOperator<Tuple2<String, Integer>> keyBy = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] s1 = s.split(" ");
for (String s2 : s1) {
collector.collect(new Tuple2<>(s2, 1));
}
}
}).keyBy(item->item.f0)
.sum(1);
keyBy.print();
env.execute();
}
}
/* 输出
6> (hi,1)
16> (program,1)
5> (hello,1)
5> (hello,2)
13> (flink,1)
5> (hello,3)
3> (java,1)
*/