Flink中的时间处理主要 basrd 于三种时间:
- 事件时间(Event Time):数据自己携带的时间戳,通常表示数据被生成或采集的时间。
- 处理时间(Processing Time):数据被Flink处理的时间。
- 注入时间(Ingestion Time):数据被Flink接收的时间。
Flink主要通过以下方式处理不同时间:
- 事件时间:调用.assignTimestampsAndWatermarks()方法将数据流转换为带时间戳的流,然后窗口、触发器根据时间戳进行操作。
- 处理时间:直接使用系统时间,窗口、触发器基于ProcessingTimeProcessFunction进行操作。
- 注入时间:使用TimestampAssigner的extractTimestamp()方法获取数据的接收时间,作为时间戳进行处理。
下面通过例子说明不同时间的用法:
事件时间:
// 分配事件时间戳和Watermark
DataStream<Tuple2<String, Long>> streamWithTimestamp =
stream.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
public long extractTimestamp(String element, long previousElementTimestamp) {
return Long.parseLong(element.split(",")[1]); // 第二个字段是事件时间戳
}
})
);
// 基于时间戳进行5秒钟窗口操作
DataStream<String> windowedStream = streamWithTimestamp.keyBy(t -> t.f0)
.timeWindow(Time.seconds(5))
.reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1));
处理时间:
DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 基于ProcessingTimeProcessFunction进行5秒钟滚动窗口
DataStream<String> result = stream.keyBy(x -> x)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessingTimeProcessFunction<String, String>() {
public void processElement(String value,
ProcessFunction<String, String>.Context ctx,
Iterable<String> elements) {
// 窗口内元素集合elements,每5秒钟处理一次
...
}
});
注入时间:
DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 使用TimestampAssigner获取数据接收时间作为时间戳
SingleOutputStreamOperator<String> streamWithTimestamp = stream.assignTimestamps(new TimestampAssigner<String>() {
public long extractTimestamp(String element, long recordTimestamp) {
return System.currentTimeMillis(); // 数据接收时间
}
});
// 根据时间戳进行窗口操作
DataStream<String> windowedStream = streamWithTimestamp.keyBy(t -> t.f0)
.timeWindow(Time.seconds(5))
.reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1));
综上,Flink通过assignTimestampsAndWatermarks()、ProcessingTimeProcessFunction和TimestampAssigner支持对不同时间的处理。我们可以根据业务选择合适的时间,来实现相应的窗口操作和其他时间相关功能。