Flink从入门到实战九[Table API&SQL]-4-使用StreamExecutionEnvironment创建 TableEnvironment

创建 TableEnvironment的第二种方法,可以通过StreamExecutionEnvironment 创建一个StreamTableEnvironment。
代码如下:

package com.itzhimei.tablesql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 使用StreamTableEnvironment.create(env)创建TableEnvironment
 */
public class TableSQL_1_TableEnvironment {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        Table table = tableEnv.fromDataStream(dataStream);

        //使用table api查询数据
        Table result1 = table.select("*").where("amount >= 500");

        //使用sql查询数据
        tableEnv.createTemporaryView("transactions", table);
        Table result2 = tableEnv.sqlQuery("select * from transactions where amount >= 500");

        DataStream<Row> rowDataStream1 = tableEnv.toDataStream(result1);
        DataStream<Row> rowDataStream2 = tableEnv.toDataStream(result2);

        rowDataStream1.print("rowDataStream1");
        rowDataStream2.print("rowDataStream2");
        env.execute();
    }
}


/* 输出结果
rowDataStream2:8> +I[3, 1546281720000, 871.15]
rowDataStream1:2> +I[3, 1546281720000, 871.15]
rowDataStream1:3> +I[3, 1546299720000, 871.15]
rowDataStream2:1> +I[3, 1546299720000, 871.15]
rowDataStream2:2> +I[3, 1546317720000, 871.15]
rowDataStream1:4> +I[3, 1546317720000, 871.15]
 */