Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(StreamSourceFromCollection.java:26)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:496)
at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:193)
at org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:116)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:293)
at org.itzhimei.source.StreamSourceFromCollection.main(StreamSourceFromCollection.java:33)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:612)
at org.itzhimei.source.StreamSourceFromCollection.main(StreamSourceFromCollection.java:26)
代码:
/**
* Stream Source From Collection
*/
public class StreamSourceFromCollection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList(
"hello flink",
"hello java",
"hello world",
"test",
"source",
"collection"));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(
(String s, Collector<Tuple2<String, Integer>> collector) -> {
String[] words = s.split(" ");
//Arrays.stream(words).forEach((String sp) -> collector.collect(new Tuple2<String, Integer>(sp, 1)));
for(String word:words) {
collector.collect(new Tuple2<>(word,1));
}
}).keyBy(item -> item.f0)
.sum(1);
sum.print();
env.execute();
}
}
原因:Lambda无法通过表达式推断出正确的类型
改造代码:
1、第一种方法就是使用匿名类
/**
* Stream Source From Collection
*/
public class StreamSourceFromCollection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList(
"hello flink",
"hello java",
"hello world",
"test",
"source",
"collection"));
/*SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(
(String s, Collector<Tuple2<String, Integer>> collector) -> {
String[] words = s.split(" ");
//Arrays.stream(words).forEach((String sp) -> collector.collect(new Tuple2<String, Integer>(sp, 1)));
for(String word:words) {
collector.collect(new Tuple2<>(word,1));
}
}).keyBy(item -> item.f0)
.sum(1);*/
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
Arrays.stream(words).forEach((String sp) -> collector.collect(new Tuple2<String, Integer>(sp, 1)));
/*for(String word:words) {
collector.collect(new Tuple2<>(word,1));
}*/
}
}).keyBy(item -> item.f0)
.sum(1);
sum.print();
env.execute();
}
}
输出结果:
5> (hello,1)
5> (hello,2)
11> (source,1)
5> (hello,3)
9> (world,1)
9> (test,1)
2> (collection,1)
13> (flink,1)
3> (java,1)
2、第二种方法,直接创建一个实现了FlatMapFunction接口,重写了flatMap方法的类
private static class MyStreamFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for(String word:words) {
collector.collect(new Tuple2<>(word,1));
}
}
}
使用:
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new MyStreamFlatMap())
.keyBy(item->item.f0)
.sum(1);
result.print();
env.execute();