Flink从入门到实战四[DataStream API]-17-Transform连接流Connect

Connect的支持将两个类型不同的流合并,输出一个类型为ConnectedStream的流。
ConnectedStream流的转换使用对应的CoMap和CoFlatMap进行转换,并输出DataStream流。
Connect:DataStream,DataStream → ConnectedStream
CoMap, CoFlatMap:ConnectedStream → DataStream

Connect用法:
DataStream someStream = //…
DataStream otherStream = //…
ConnectedStreams connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap用法:

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

代码:

package org.itzhimei.transform;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;

/**
 * Connect: DataStream,DataStream → ConnectedStream
 * CoMap, CoFlatMap: ConnectedStream → DataStream
 *
 */
public class Transform_6_Connect {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource1 = env.fromCollection(
                Arrays.asList("hello flink",
                        "hello java",
                        "hi program",
                        "hello",
                        "java"));

        DataStreamSource<Integer> dataStreamSource2 = env.fromCollection(
                Arrays.asList(1,2,3));

        ConnectedStreams<String, Integer> connect = dataStreamSource1.connect(dataStreamSource2);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = connect.map(new CoMapFunction<String, Integer, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map1(String value) throws Exception {
                //设置默认值
                return new Tuple2<>(value, 0);
            }

            @Override
            public Tuple2<String, Integer> map2(Integer value) throws Exception {
                //设置默认值
                return new Tuple2<>("word", value);
            }
        });

        map.print();

        env.execute();
    }
}

/* 输出
(hello flink,0)
(word,1)
(hello java,0)
(word,2)
(hi program,0)
(word,3)
(hello,0)
(java,0)
 */