使用Flink的侧流创建Output,报错如下:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Could not determine TypeInformation for the OutputTag type. The most common reason is forgetting to make the OutputTag an anonymous inner class. It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.
at org.apache.flink.util.OutputTag.<init>(OutputTag.java:68)
at com.itzhimei.process.ProcessFunction_4_Output.main(ProcessFunction_4_Output.java:21)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.util.OutputTag could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1384)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:811)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:787)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:774)
at org.apache.flink.util.OutputTag.<init>(OutputTag.java:66)
... 1 more
原因在于代码中的OutputTag的定义有问题,代码如下:
OutputTag outputTag = new OutputTag(“OutputTag”);
这里定义了一个OutputTag对象。
看OutputTag源码:
public class OutputTag<T> implements Serializable {
......
/**
* Creates a new named {@code OutputTag} with the given id.
*
* @param id The id of the created {@code OutputTag}.
*/
public OutputTag(String id) {
Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
this.id = id;
try {
this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
} catch (InvalidTypesException e) {
throw new InvalidTypesException(
"Could not determine TypeInformation for the OutputTag type. "
+ "The most common reason is forgetting to make the OutputTag an anonymous inner class. "
+ "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.",
e);
}
}
......
}
OutputTag类定义上是有泛型的,但是构造器上却没有泛型,你直接new OutputTag,得到的对象就是没有类型的。
所以程序会报错。
解决方法也很简单,就是将OutputTag对象创建为一个匿名内部类,如下:
OutputTag outputTag = new OutputTag("OutputTag"){};
重点注意最后的{}。