创建 TableEnvironment的第二种方法,可以通过StreamExecutionEnvironment 创建一个StreamTableEnvironment。
代码如下:
package com.itzhimei.tablesql;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 使用StreamTableEnvironment.create(env)创建TableEnvironment
*/
public class TableSQL_1_TableEnvironment {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
.name("transactions");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(dataStream);
//使用table api查询数据
Table result1 = table.select("*").where("amount >= 500");
//使用sql查询数据
tableEnv.createTemporaryView("transactions", table);
Table result2 = tableEnv.sqlQuery("select * from transactions where amount >= 500");
DataStream<Row> rowDataStream1 = tableEnv.toDataStream(result1);
DataStream<Row> rowDataStream2 = tableEnv.toDataStream(result2);
rowDataStream1.print("rowDataStream1");
rowDataStream2.print("rowDataStream2");
env.execute();
}
}
/* 输出结果
rowDataStream2:8> +I[3, 1546281720000, 871.15]
rowDataStream1:2> +I[3, 1546281720000, 871.15]
rowDataStream1:3> +I[3, 1546299720000, 871.15]
rowDataStream2:1> +I[3, 1546299720000, 871.15]
rowDataStream2:2> +I[3, 1546317720000, 871.15]
rowDataStream1:4> +I[3, 1546317720000, 871.15]
*/