上一节知道了TableEnvironment的作用,并且也知道了创建TableEnvironment有两种方法,我们来代码演示一下如何使用的。
第一种方法用到了EnvironmentSettings,我们可以手动指定是使用Flink planner,还是Blink planner,还可以指定其他设置。
例如通过EnvironmentSettings.newInstance()方法获取一个实例,然后基于建造者模式,完善EnvironmentSettings实例。
代码如下:
package com.itzhimei.tablesql;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 使用EnvironmentSettings创建TableEnvironment
*/
public class TableSQL_2_TableEnvironment {
public static void main(String[] args) throws Exception {
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
.name("transactions");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,envSettings);
Table table = tableEnv.fromDataStream(dataStream);
//使用table api查询数据
Table result1 = table.select("*").where("amount >= 500");
//使用sql查询数据
tableEnv.createTemporaryView("transactions", table);
Table result2 = tableEnv.sqlQuery("select * from transactions where amount >= 500");
DataStream<Row> rowDataStream1 = tableEnv.toDataStream(result1);
DataStream<Row> rowDataStream2 = tableEnv.toDataStream(result2);
rowDataStream1.print("rowDataStream1");
rowDataStream2.print("rowDataStream2");
env.execute();
}
}
/* 输出结果
rowDataStream2:8> +I[3, 1546281720000, 871.15]
rowDataStream1:1> +I[3, 1546281720000, 871.15]
rowDataStream2:1> +I[3, 1546299720000, 871.15]
rowDataStream1:2> +I[3, 1546299720000, 871.15]
rowDataStream1:3> +I[3, 1546317720000, 871.15]
rowDataStream2:2> +I[3, 1546317720000, 871.15]
rowDataStream1:4> +I[3, 1546335720000, 871.15]
rowDataStream2:3> +I[3, 1546335720000, 871.15]
rowDataStream1:5> +I[3, 1546353720000, 871.15]
rowDataStream2:4> +I[3, 1546353720000, 871.15]
*/