Flink从入门到实战四[DataStream API]-14-Transform分组算子KeyBy

DataStream API没有reduce和sum这类聚合操作的方法,因为Flink设计中,数据必须先分组才能做聚合操作。
所以一般操作是对DataStream做keyBy,得到KeyedStream,然后调用KeyedStream API上的reduce、sum等聚合操作方法。

KeyBy逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式分区。
流转换:DataStream → KeyedStream,做完分区之后,就可以按照分区数据进行聚合,例如:
sum()
min()
max()
minBy()
maxBy()

依旧以WordCount为例计算每个单词出现的次数,代码:

package org.itzhimei.transform;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Map DataStream → DataStream
 *
 */
public class Transform_4_KeyBy {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.fromCollection(
                Arrays.asList("hello flink",
                        "hello java",
                        "hi program",
                        "hello"));

        // 1、分词
        // 2、按每个单词作为key分组
        // 3、统计每个单词出现次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> keyBy = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] s1 = s.split(" ");
                for (String s2 : s1) {
                    collector.collect(new Tuple2<>(s2, 1));
                }
            }
        }).keyBy(item->item.f0)
                .sum(1);

        keyBy.print();

        env.execute();
    }
}

/* 输出
6> (hi,1)
16> (program,1)
5> (hello,1)
5> (hello,2)
13> (flink,1)
5> (hello,3)
3> (java,1)
 */