Flink中的时间处理是如何实现的?

Flink中的时间处理主要 basrd 于三种时间:

  1. 事件时间(Event Time):数据自己携带的时间戳,通常表示数据被生成或采集的时间。
  2. 处理时间(Processing Time):数据被Flink处理的时间。
  3. 注入时间(Ingestion Time):数据被Flink接收的时间。

Flink主要通过以下方式处理不同时间:

  1. 事件时间:调用.assignTimestampsAndWatermarks()方法将数据流转换为带时间戳的流,然后窗口、触发器根据时间戳进行操作。
  2. 处理时间:直接使用系统时间,窗口、触发器基于ProcessingTimeProcessFunction进行操作。
  3. 注入时间:使用TimestampAssigner的extractTimestamp()方法获取数据的接收时间,作为时间戳进行处理。

下面通过例子说明不同时间的用法:

事件时间:

// 分配事件时间戳和Watermark  
DataStream<Tuple2<String, Long>> streamWithTimestamp = 
    stream.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(",")[1]);  // 第二个字段是事件时间戳  
            }
        })
    );
// 基于时间戳进行5秒钟窗口操作           
DataStream<String> windowedStream = streamWithTimestamp.keyBy(t -> t.f0)  
                                                       .timeWindow(Time.seconds(5))
                                                       .reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1));  

处理时间:

DataStream<String> stream = env.socketTextStream("localhost", 9999);  
// 基于ProcessingTimeProcessFunction进行5秒钟滚动窗口
DataStream<String> result = stream.keyBy(x -> x)
                                  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))   
                                  .process(new ProcessingTimeProcessFunction<String, String>() { 
                                      public void processElement(String value, 
                                                               ProcessFunction<String, String>.Context ctx, 
                                                               Iterable<String> elements) {
                                          // 窗口内元素集合elements,每5秒钟处理一次
                                          ...  
                                      }
                                  });

注入时间:

DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 使用TimestampAssigner获取数据接收时间作为时间戳   
SingleOutputStreamOperator<String> streamWithTimestamp = stream.assignTimestamps(new TimestampAssigner<String>() {
    public long extractTimestamp(String element, long recordTimestamp) {
        return System.currentTimeMillis();  // 数据接收时间
    }
});  
// 根据时间戳进行窗口操作  
DataStream<String> windowedStream = streamWithTimestamp.keyBy(t -> t.f0) 
                                                       .timeWindow(Time.seconds(5)) 
                                                       .reduce((a, b) -> a.f0 + "," + (a.f1 + b.f1));

综上,Flink通过assignTimestampsAndWatermarks()、ProcessingTimeProcessFunction和TimestampAssigner支持对不同时间的处理。我们可以根据业务选择合适的时间,来实现相应的窗口操作和其他时间相关功能。