Flink从入门到实战九[Table API&SQL]-7-基于FileSystem 创建 TABLE 通过 connector 声明

Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
需要注意的是旧版本的Flink基于connector来建表的语法如下:

tableEnvironment 
.connect(...) // Table Connector Eg:Kafka
.withFormat(...) // Format Type Eg:JSON
.withSchema(...) // Table Schem
.inAppendMode() // update mode
.createTemporaryTable(“MyTable”) // Register Table

是通过tableEnvironment的connect方法来实现的,但是Flink1.14版本后,api中没有connect方法,来看具体的1.14API的使用方法。

通过 table connector 声明来创建表

package com.itzhimei.tablesql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * CreateTableConnector From FileSystem Using TABLE Descriptors
 */
public class TableSQL_5_CreateTableConnectorFromFileSystem {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        TableDescriptor sourceDescriptor = TableDescriptor.forConnector("filesystem")
                .format("csv")
                .schema(Schema.newBuilder()
                        .column("f0", DataTypes.BIGINT())
                        .column("f1", DataTypes.BIGINT())
                        .column("f2", DataTypes.DECIMAL(10,2))
                        .build())
                .option("path", "file:///D:\\com\\itzhimei\\tablesql\\5.txt")
                .build();


        tableEnv.createTable("TransactionTable1", sourceDescriptor);

        Table table = tableEnv.sqlQuery("select * from TransactionTable1"); // where amount >= 500
        DataStream<Row> rowDataStream2 = tableEnv.toDataStream(table);
        rowDataStream2.print("rowDataStream2");

        env.execute();
    }

}


/* 输出结果
rowDataStream2> +I[1, 1546272000000, 188.23]
rowDataStream2> +I[2, 1546272360000, 374.79]
rowDataStream2> +I[3, 1546272720000, 112.15]
rowDataStream2> +I[4, 1546273080000, 478.75]
rowDataStream2> +I[5, 1546273440000, 208.85]
 */