Flink自带的Watermark类,最主要的就是BoundedOutOfOrdernessWatermarks。
BoundedOutOfOrdernessWatermarks实现了WatermarkGenerator接口,实现了两个方法:onEvent和onPeriodicEmit。
BoundedOutOfOrdernessWatermarks的源码中最大无序时间是由构造器传入的,和我们自定义的写法基本相同。
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
我们基于前面章节和本章节上面的内容,来自定义一个Watermark生成器,并应用,代码如下:
package com.itzhimei.watermark;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 自定义Watermark生成器
* 基于事件时间滚动1小时计算用户交易总金额
* Watermark时间为1小时
*/
public class Watermark_2_Define {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource())
.name("transactions");
dataStream.print("source");
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.forGenerator((ctx) -> new MyWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sum("amount");
amount.print("amount");
env.execute();
}
private static class MyWatermarkGenerator implements WatermarkGenerator<Transaction> {
private final long maxOutOfOrderness = 1000*60*60;
private long currentMaxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(Transaction event, long eventTimestamp, WatermarkOutput output) {
System.out.println("Transaction Timestamp:" + event.getTimestamp() + " eventTimestamp:" + eventTimestamp);
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
}
/* 输出
Transaction Timestamp:1546272000000 eventTimestamp:1546272000000
Transaction Timestamp:1546272360000 eventTimestamp:1546272360000
source:13> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
Transaction Timestamp:1546272720000 eventTimestamp:1546272720000
source:14> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
Transaction Timestamp:1546273080000 eventTimestamp:1546273080000
source:15> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
source:16> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
Transaction Timestamp:1546273440000 eventTimestamp:1546273440000
source:1> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
Transaction Timestamp:1546273800000 eventTimestamp:1546273800000
Transaction Timestamp:1546274160000 eventTimestamp:1546274160000
source:2> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
source:3> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
Transaction Timestamp:1546274520000 eventTimestamp:1546274520000
Transaction Timestamp:1546274880000 eventTimestamp:1546274880000
source:4> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
source:5> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
Transaction Timestamp:1546275240000 eventTimestamp:1546275240000
Transaction Timestamp:1546275600000 eventTimestamp:1546275600000
source:6> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
Transaction Timestamp:1546275960000 eventTimestamp:1546275960000
source:7> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
Transaction Timestamp:1546276320000 eventTimestamp:1546276320000
source:8> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
source:9> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
Transaction Timestamp:1546276680000 eventTimestamp:1546276680000
Transaction Timestamp:1546277040000 eventTimestamp:1546277040000
source:10> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
Transaction Timestamp:1546277400000 eventTimestamp:1546277400000
source:11> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
Transaction Timestamp:1546277760000 eventTimestamp:1546277760000
source:12> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
Transaction Timestamp:1546278120000 eventTimestamp:1546278120000
source:13> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
source:14> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
source:15> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
Transaction Timestamp:1546278480000 eventTimestamp:1546278480000
Transaction Timestamp:1546278840000 eventTimestamp:1546278840000
Transaction Timestamp:1546279200000 eventTimestamp:1546279200000
source:16> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
Transaction Timestamp:1546279560000 eventTimestamp:1546279560000
source:1> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
source:2> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
*/