Flink如何处理乱序数据?

Flink处理乱序数据主要有两种方式:

  1. 增加数据的watermark。这可以指定数据的事件时间,Flink会根据watermark来排序和聚合数据。
  2. 使用数据的rowtime属性。Flink会自动根据rowtime来对数据进行排序和窗口聚合。

下面通过例子来说明这两种方式:

使用watermark:

DataStream<String> stream = env.socketTextStream("localhost", 9999);

// 指定5秒钟的最大延迟,超过5秒的数据会被丢弃 
DataStream<Tuple2<String, Long>> streamWithTimestamp = stream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(",")[1]);  // 第二个字段是时间戳
            }
        })
); 

// 根据时间戳进行窗口聚合 
streamWithTimestamp.keyBy(t -> t.f0)  
                     .timeWindow(Time.seconds(10))  
                     .reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1)); 

使用rowtime:

DataStream<String> stream = env.socketTextStream("localhost", 9999);

// 定义第二个字段为事件时间戳
DataStream<Tuple2<String, Long>> streamWithTimestamp = stream.map(line -> {
    String[] fields = line.split(",");
    return new Tuple2<String, Long>(fields[0], Long.parseLong(fields[1]));
});

// 根据rowtime进行窗口聚合 
streamWithTimestamp.keyBy(t -> t.f0)
                     .timeWindow(Time.seconds(10))  
                     .reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1)); 

这两个例子都从socket读取数据,其中第二个字段表示事件时间戳。
第一个例子使用.assignTimestampsAndWatermarks()方法指定watermark,来对数据进行乱序处理和窗口聚合。
第二个例子将第二个字段定义为rowtime,Flink会自动使用rowtime对数据进行排序和窗口聚合。

理解这两种方式,可以让我们根据业务选择恰当的方式来处理乱序数据。Watermark主要用于未指定事件时间的数据,rowtime属性更适用于原始数据中已经包含时间戳的场景。