要了解Watermark,要先了解Flink中的时间语义,Flink中的时间分为三种:
Event Time 事件时间:业务数据中的数据产生的时间
Ingestion Time 摄取时间:数据到达Flink中的时间
Processing Time 处理时间:Flink处理数据的时间
Watermarks是什么
我们使用Flink处理实时数据时,数据不会完全按照发生的顺序进入到Flink中,当一个计算窗口触发计算,属于这个窗口的数据,可能迟迟没有进入到Flink中参与计算。
Watermark就是用来解决乱序事件和迟到时间的一个机制,能够控制接入的数据何时触发计算,让程序可以等待一定时间,来获取迟到数据。
Watermark使用的是数据中的事件时间来计算更新的,并且必须单调递增,以确保任务的事件时间在向前推进,而不是在后退。
理解Watermark
理想情况下,数据按照时间发生的时间依次进入到Flink中,被分配到对应的窗口进行计算,如下:
35 32 || 28 25 23 || 19 17 13 || 7 5 1 →→ Flink
每个数字的值都代表一个时间发生的时间,值越大代表时间发生之间越靠后,数字从右到左,是按照从小到大的顺序到达Flink程序的。
实际情况则是数据没有按照实际发生的事件时间的先后顺序到达Flink参与计算,如下:
35 32 || 25 19 23 || 28 17 5 || 13 7 1 →→ Flink
Watermarks的产生
Source生成watermaker,经过map转换,带到下游的算子中,算子则根据来到的数据中附带的Watermark,更新算子自身的Watermark。
Watermarks的计算方式
watermaker = max EventTime – LateTime,即:watermaker = 最大事件时间-允许的最大延时时间。
watermaker根据数据的时间计算生成最新的watermaker,从而来判断是否有窗口触发了执行条件。
注意:LateTime越大,延时越高。
窗口触发计算的条件
窗口触发条件 Current watermaker > Window EndTime
Watermarks生成方式
Periodic Watermarks(周期生成,常用) :Based on Event Time
Punctuated Watermarks (标记生成):Based on something in the event stream
Watermarks API
早期版本写法(1.11之前):
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner)
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)
代码例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Transaction> stream = env.readFile("123.txt");
SingleOutputStreamOperator<Transaction> amount1 = dataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Transaction>(Time.milliseconds(1000)) {
@Override
public long extractTimestamp(Transaction element) {
return element.getTimestamp();
}
})
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("amount");
这里直接新建了一个匿名类,继承自抽象类BoundedOutOfOrdernessTimestampExtractor,BoundedOutOfOrdernessTimestampExtractor又实现了AssignerWithPeriodicWatermarks。
BoundedOutOfOrdernessTimestampExtractor有一个抽象方法需要实现,就是指定对象的事件时间
public long extractTimestamp(Transaction element) {
return element.getTimestamp();
}
新版本的写法(例如我使用的1.14.3):
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)
代码例:
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());