WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用,我们平时开发基本都是使用第一种。
使用方法例如:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
使用Flink提供的默认的Watermark进行数据计算,代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import java.time.Duration;
/**
* Watermark
* 基于事件时间滚动1小时计算用户交易总金额
* Watermark时间为15分钟
*/
public class Watermark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource())
.name("transactions");
dataStream.print("source");
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(15))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sum("amount");
amount.print("amount");
env.execute();
}
}
/* 输出
source:14> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
source:15> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
source:16> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
source:1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
source:2> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
source:3> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
source:4> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
source:5> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
source:6> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
source:7> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
source:8> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
source:9> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
source:10> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
source:11> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
source:12> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
source:13> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
source:14> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
source:15> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
source:16> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
source:1> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
source:2> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
source:3> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
source:4> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
source:5> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
amount:11> Transaction{accountId=1, timestamp=1546272000000, amount=1254.74}
amount:1> Transaction{accountId=4, timestamp=1546273080000, amount=992.2100000000002}
amount:15> Transaction{accountId=3, timestamp=1546272720000, amount=433.88899999999995}
amount:16> Transaction{accountId=2, timestamp=1546272360000, amount=1536.2900000000002}
amount:16> Transaction{accountId=5, timestamp=1546273440000, amount=1244.56}
source:6> Transaction{accountId=5, timestamp=1546280640000, amount=127.55}
source:7> Transaction{accountId=1, timestamp=1546281000000, amount=483.91}
source:8> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
source:9> Transaction{accountId=3, timestamp=1546281720000, amount=871.15}
source:10> Transaction{accountId=4, timestamp=1546282080000, amount=64.19}
source:11> Transaction{accountId=5, timestamp=1546282440000, amount=79.43}
source:12> Transaction{accountId=1, timestamp=1546282800000, amount=56.12}
source:13> Transaction{accountId=2, timestamp=1546283160000, amount=256.48}
source:14> Transaction{accountId=3, timestamp=1546283520000, amount=148.16}
source:15> Transaction{accountId=4, timestamp=1546283880000, amount=199.95}
*/