TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
• 在内部的 catalog 中注册 Table
• 注册外部的 catalog
• 加载可插拔模块
• 执行 SQL 查询
• 注册自定义函数 (scalar、table 或 aggregation)
• DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )
创建 TableEnvironment有两种方法,一种方法用到了EnvironmentSettings,我们可以手动指定是使用Flink planner,还是Blink planner,还可以指定其他设置。
例如通过EnvironmentSettings.newInstance()方法获取一个实例,然后基于建造者模式,完善EnvironmentSettings实例。
代码如下:
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
不过,通过useOldPlanner()方法,虽然可以手动指定使用旧版本的Flink planner,但是看源码你会发现,从Flink1.14开始这个方法已经不能使用了,使用这个方法,代码在运行时会直接报错。
@Deprecated
public EnvironmentSettings.Builder useOldPlanner() {
throw new TableException("The old planner has been removed in Flink 1.14. Please upgrade your table program to use the default planner (previously called the 'blink' planner).");
}
还可以通过方法inStreamingMode()和inBatchMode()来指定执行的模式。
创建 TableEnvironment的第二种方法,可以通过StreamExecutionEnvironment 创建一个StreamTableEnvironment。
代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
注意:Table 总是与特定的 TableEnvironment 绑定。 不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。
下一节我们来演示一下如何使用这两种方法创建TableEnvironment。