Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
需要注意的是旧版本的Flink基于connector来建表的语法如下:
tableEnvironment
.connect(...) // Table Connector Eg:Kafka
.withFormat(...) // Format Type Eg:JSON
.withSchema(...) // Table Schem
.inAppendMode() // update mode
.createTemporaryTable(“MyTable”) // Register Table
是通过tableEnvironment的connect方法来实现的,但是Flink1.14版本后,api中没有connect方法,来看具体的1.14API的使用方法。
通过 table connector 声明来创建表
package com.itzhimei.tablesql;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* CreateTableConnector From FileSystem Using TABLE Descriptors
*/
public class TableSQL_5_CreateTableConnectorFromFileSystem {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("filesystem")
.format("csv")
.schema(Schema.newBuilder()
.column("f0", DataTypes.BIGINT())
.column("f1", DataTypes.BIGINT())
.column("f2", DataTypes.DECIMAL(10,2))
.build())
.option("path", "file:///D:\\com\\itzhimei\\tablesql\\5.txt")
.build();
tableEnv.createTable("TransactionTable1", sourceDescriptor);
Table table = tableEnv.sqlQuery("select * from TransactionTable1"); // where amount >= 500
DataStream<Row> rowDataStream2 = tableEnv.toDataStream(table);
rowDataStream2.print("rowDataStream2");
env.execute();
}
}
/* 输出结果
rowDataStream2> +I[1, 1546272000000, 188.23]
rowDataStream2> +I[2, 1546272360000, 374.79]
rowDataStream2> +I[3, 1546272720000, 112.15]
rowDataStream2> +I[4, 1546273080000, 478.75]
rowDataStream2> +I[5, 1546273440000, 208.85]
*/