Connect的支持将两个类型不同的流合并,输出一个类型为ConnectedStream的流。
ConnectedStream流的转换使用对应的CoMap和CoFlatMap进行转换,并输出DataStream流。
Connect:DataStream,DataStream → ConnectedStream
CoMap, CoFlatMap:ConnectedStream → DataStream
Connect用法:
DataStream someStream = //…
DataStream otherStream = //…
ConnectedStreams connectedStreams = someStream.connect(otherStream);
CoMap, CoFlatMap用法:
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
代码:
package org.itzhimei.transform;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.Arrays;
/**
* Connect: DataStream,DataStream → ConnectedStream
* CoMap, CoFlatMap: ConnectedStream → DataStream
*
*/
public class Transform_6_Connect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource1 = env.fromCollection(
Arrays.asList("hello flink",
"hello java",
"hi program",
"hello",
"java"));
DataStreamSource<Integer> dataStreamSource2 = env.fromCollection(
Arrays.asList(1,2,3));
ConnectedStreams<String, Integer> connect = dataStreamSource1.connect(dataStreamSource2);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = connect.map(new CoMapFunction<String, Integer, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map1(String value) throws Exception {
//设置默认值
return new Tuple2<>(value, 0);
}
@Override
public Tuple2<String, Integer> map2(Integer value) throws Exception {
//设置默认值
return new Tuple2<>("word", value);
}
});
map.print();
env.execute();
}
}
/* 输出
(hello flink,0)
(word,1)
(hello java,0)
(word,2)
(hi program,0)
(word,3)
(hello,0)
(java,0)
*/