水印(Watermark)是Flink用于处理乱序事件的机制。它用于表示数据流中事件时间的进展。
当数据源并非严格的时间顺序到达时,Flink会根据Watermark来推进窗口的执行和触发。简而言之,Watermark是衡量事件时间的 bookmark。
在Flink中,我们可以通过以下方式使用Watermark:
- 将数据流转换为时间戳和Watermark流,使用TimestampAssigner分配事件时间戳。
- 指定Watermark的延迟时间,Flink会根据Watermark推进窗口执行。
- 窗口根据Watermark触发执行,产生结果。
- 水印允许指定一定程度的乱序(Out-of-Orderness),超过此范围的迟到数据会被丢弃。
下面通过一个例子来说明Watermark的使用:
DataStream<String> stream = env.readTextFile("input");
// 分配事件时间戳和Watermark
DataStream<Tuple2<String, Long>> streamWithTimestamp =
stream.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
public long extractTimestamp(String element, long previousElementTimestamp) {
return Long.parseLong(element.split(",")[1]); // 第二个字段是事件时间戳
}
})
);
// 5秒钟窗口,根据Watermark触发
DataStream<String> windowedStream = streamWithTimestamp.keyBy(t -> t.f0)
.timeWindow(Time.seconds(5))
.reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1));
windowedStream.print();
env.execute();
我们来分析一下这个例子:
- 使用.assignTimestampsAndWatermarks()方法将数据流转换为带时间戳和Watermark的流。Watermark的延迟时间设置为5秒。
- 5秒钟的时间窗口会根据Watermark触发执行和结果输出。
- Watermark允许5秒的乱序数据,超过5秒的迟到数据会被丢弃。
- 窗口函数会根据Watermark来触发,产生最终结果。
理解Watermark的原理和用法,是我们实现事件驱动流处理的基础。通过Watermark可以推进窗口的触发,产生结果,同时允许一定程度的乱序处理。
Watermark是Flink作为事件驱动引擎的一个重要特征。掌握Watermark的工作机制,可以让我们理解Flink是如何进行事件驱动和乱序处理的。