Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
需要注意的是旧版本的Flink基于connector来建表的语法如下:
tableEnvironment
.connect(...) // Table Connector Eg:Kafka
.withFormat(...) // Format Type Eg:JSON
.withSchema(...) // Table Schem
.inAppendMode() // update mode
.createTemporaryTable(“MyTable”) // Register Table
是通过tableEnvironment的connect方法来实现的,但是Flink1.14版本后,api中没有connect方法,来看具体的1.14API的使用方法。
通过 table connector SQL DDL来创建表,其语句就和我们数据库建表语句非常相似,代码如下:
package com.itzhimei.tablesql;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* CreateTableConnector From FileSystem Using SQL DDL
*/
public class TableSQL_6_CreateTableConnectorFromFileSystem {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Using SQL DDL
tableEnv.executeSql("CREATE TABLE TransactionTable (\n" +
" accountId BIGINT,\n" +
" timestamps BIGINT,\n" +
" amount DECIMAL(18,2)\n" +
") WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'path' = 'file:///D:\\com\\itzhimei\\tablesql\\5.txt', \n" +
" 'format' = 'csv' \n" +
")");
Table table = tableEnv.sqlQuery("select * from TransactionTable"); // where amount >= 500
DataStream<Row> rowDataStream2 = tableEnv.toDataStream(table);
rowDataStream2.print("rowDataStream2");
env.execute();
}
}
/* 输出结果
rowDataStream2> +I[1, 1546272000000, 188.23]
rowDataStream2> +I[2, 1546272360000, 374.79]
rowDataStream2> +I[3, 1546272720000, 112.15]
rowDataStream2> +I[4, 1546273080000, 478.75]
rowDataStream2> +I[5, 1546273440000, 208.85]
*/