Flink从入门到实战四[DataStream API]-16-Transform连接流Union

Union的作用是将两个或多个流合并,创建出一个新流,要求作为数据源的流类型一致,最终数据的新流类型和数据源一致。
Union DataStream* → DataStream
用法:dataStream.union(otherStream1, otherStream2, …);

package org.itzhimei.transform;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Union DataStream* → DataStream
 *
 */
public class Transform_5_Union {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource1 = env.fromCollection(
                Arrays.asList("hello flink",
                        "hello java",
                        "hi program",
                        "hello",
                        "java"));

        //把word复制一遍,输出结果是之前的一倍
        DataStreamSource<String> dataStreamSource2 = env.fromCollection(
                Arrays.asList("hello flink",
                        "hello java",
                        "hi program",
                        "hello",
                        "java"));

        DataStream<String> unionStreamSource = dataStreamSource1.union(dataStreamSource2);
        // 1、分词
        // 2、按每个单词作为key分组
        // 3、使用sum统计每个单词出现次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = unionStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] s1 = s.split(" ");
                for (String s2 : s1) {
                    collector.collect(new Tuple2<>(s2, 1));
                }
            }
        }).keyBy(item -> item.f0)
            .sum(1);

        reduce.print();

        env.execute();
    }
}

/* 输出
(hello,1)
(flink,1)
(hello,2)
(java,1)
(hi,1)
(program,1)
(hello,3)
(java,2)
(hello,4)
(flink,2)
(hello,5)
(java,3)
(hi,2)
(program,2)
(hello,6)
(java,4)
 */