Flink连接器(Connector)是将Flink程序与外部系统打通的机制。它可以从外部系统读取数据输入Flink,或者将Flink的计算结果输出到外部系统。
Flink提供了丰富的连接器支持,主要有:
- 消息队列连接器:Kafka、RabbitMQ等
- 存储连接器:HDFS、S3、GCS等
- 数据库连接器:MySQL、PostgreSQL、Oracle等
- 搜索引擎连接器:Elasticsearch等
- 文件连接器:本地文件、Docker日志等
使用Flink连接器的一般步骤:
- 添加连接器依赖。
- 创建连接器的配置对象,设置服务URL、连接参数等。
- 使用env.connect方法创建连接,传入连接器配置对象。
- 使用打通的连接来读取/输出数据。
下面通过Kafka连接器的例子来说明Flink连接器的使用:
添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
创建连接器配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testGroup");
FlinkKafkaConsumer011<String> kafkaSource = new FlinkKafkaConsumer011<>(
"topic",
new SimpleStringSchema(),
props);
创建连接:
DataStream<String> stream = env.addSource(kafkaSource);
读取Kafka Topic的数据输入Flink:
DataStream<String> stream = env.addSource(kafkaSource);
stream.print();// 打印
输出到Kafka:
DataStream<String> stream = ...
FlinkKafkaProducer011<String> kafkaSink = new FlinkKafkaProducer011<>(
"topic",
new SimpleStringSchema(),
props);
stream.addSink(kafkaSink);
Flink连接器的使用为我们构建真正意义上的流式应用程序提供了重要支撑。
连接器机制使Flink程序具有强大的外部连接能力。
Flink提供了丰富的连接器支持与外部系统对接。Flink连接器机制为我们构建真实的流式应用程序提供了重要支持。