Flink处理乱序数据主要有两种方式:
- 增加数据的watermark。这可以指定数据的事件时间,Flink会根据watermark来排序和聚合数据。
- 使用数据的rowtime属性。Flink会自动根据rowtime来对数据进行排序和窗口聚合。
下面通过例子来说明这两种方式:
使用watermark:
DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 指定5秒钟的最大延迟,超过5秒的数据会被丢弃
DataStream<Tuple2<String, Long>> streamWithTimestamp = stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
public long extractTimestamp(String element, long previousElementTimestamp) {
return Long.parseLong(element.split(",")[1]); // 第二个字段是时间戳
}
})
);
// 根据时间戳进行窗口聚合
streamWithTimestamp.keyBy(t -> t.f0)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1));
使用rowtime:
DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 定义第二个字段为事件时间戳
DataStream<Tuple2<String, Long>> streamWithTimestamp = stream.map(line -> {
String[] fields = line.split(",");
return new Tuple2<String, Long>(fields[0], Long.parseLong(fields[1]));
});
// 根据rowtime进行窗口聚合
streamWithTimestamp.keyBy(t -> t.f0)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1));
这两个例子都从socket读取数据,其中第二个字段表示事件时间戳。
第一个例子使用.assignTimestampsAndWatermarks()方法指定watermark,来对数据进行乱序处理和窗口聚合。
第二个例子将第二个字段定义为rowtime,Flink会自动使用rowtime对数据进行排序和窗口聚合。
理解这两种方式,可以让我们根据业务选择恰当的方式来处理乱序数据。Watermark主要用于未指定事件时间的数据,rowtime属性更适用于原始数据中已经包含时间戳的场景。