Flink从入门到实战九[Table API&SQL]-8-基于FileSystem 创建 TABLE 通过 connector SQL DDL

Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
需要注意的是旧版本的Flink基于connector来建表的语法如下:

tableEnvironment 
.connect(...) // Table Connector Eg:Kafka
.withFormat(...) // Format Type Eg:JSON
.withSchema(...) // Table Schem
.inAppendMode() // update mode
.createTemporaryTable(“MyTable”) // Register Table

是通过tableEnvironment的connect方法来实现的,但是Flink1.14版本后,api中没有connect方法,来看具体的1.14API的使用方法。
通过 table connector SQL DDL来创建表,其语句就和我们数据库建表语句非常相似,代码如下:

package com.itzhimei.tablesql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * CreateTableConnector From FileSystem  Using SQL DDL
 */
public class TableSQL_6_CreateTableConnectorFromFileSystem {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Using SQL DDL
        tableEnv.executeSql("CREATE TABLE TransactionTable (\n" +
                "  accountId BIGINT,\n" +
                "  timestamps BIGINT,\n" +
                "  amount DECIMAL(18,2)\n" +
                ") WITH (\n" +
                "  'connector' = 'filesystem',           \n" +
                "  'path' = 'file:///D:\\com\\itzhimei\\tablesql\\5.txt',  \n" +
                "  'format' = 'csv'                     \n" +
                ")");

        Table table = tableEnv.sqlQuery("select * from TransactionTable"); // where amount >= 500
        DataStream<Row> rowDataStream2 = tableEnv.toDataStream(table);
        rowDataStream2.print("rowDataStream2");

        env.execute();
    }

}


/* 输出结果
rowDataStream2> +I[1, 1546272000000, 188.23]
rowDataStream2> +I[2, 1546272360000, 374.79]
rowDataStream2> +I[3, 1546272720000, 112.15]
rowDataStream2> +I[4, 1546273080000, 478.75]
rowDataStream2> +I[5, 1546273440000, 208.85]
 */