Window Trigger
触发器(Trigger)决定了何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理。
Flink自带的Trigger大部分时间已经满足了,如果不满足,则可以自定义触发器来触发窗口执行。
Flink 内置 Window Trigger
ProcessingTimeTrigger 触发频率:一次触发,基于ProcessingTime 触发,当机器时间大于窗口结束时间时触发
EventTimeTrigger 触发频率:一次触发,基于EventTime,当Watermark 大于窗口结束时间触发
ContinuousProcessingTimeTrigger 触发频率:多次触发,基于 ProcessTime 的固定时间间隔触发
ContinuousEventTimeTrigger 触发频率:多次触发,基于 EventTime 的固定时间间隔触发
CountTrigger 触发频率:多次触发,基于 Element 的固定条数触发
DeltaTrigger 触发频率:多次触发,基于本次 Element和上次触发 Trigger 的 Element 做Delta 计算,超过指定 Threshold 后触发
PuringTrigger 对 Trigger 的封装实现,用于 Trigger 触发后额外清理中间状态数据
自定义Trigger
自定义的Trigger需要继承Trigger抽象类,按需实现以下方法:
.trigger(new Trigger<Transaction, TimeWindow>() {
@Override
public TriggerResult onElement(Transaction element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
})
TriggerResult中包含四个枚举值:
CONTINUE:表示对窗口不执行任何操作。
FIRE:表示对窗口中的数据按照窗口函数中的逻辑进行计算,并将结果输出。注意计算完成后,窗口中的数据并不会被清除,将会被保留。
PURGE:表示将窗口中的数据和窗口清除。
FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。
代码:
package com.itzhimei.window;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import java.time.Duration;
/**
* 自定义窗口触发器
* 每一条数据都出发窗口执行
* 通过dataStream的数据源和计算结果对比检查结果
*/
public class Window_10_Trigger {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource())
.name("transactions");
dataStream.print("source");
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.trigger(new Trigger<Transaction, TimeWindow>() {
@Override
public TriggerResult onElement(Transaction element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}).sum("amount");
amount.print("amount");
env.execute();
}
}
/* 每一条数据都出发窗口执行
source:2> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
amount:11> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
source:3> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
amount:16> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
source:4> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
amount:15> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
source:5> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
amount:1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
source:6> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
amount:16> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
source:7> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
amount:11> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
source:8> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
amount:16> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
source:9> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
amount:15> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
*/
看EventTimeTrigger的源码:
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
* A {@link Trigger} that fires once the watermark passes the end of the window to which a pane
* belongs.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
/**
* Creates an event-time trigger that fires once the watermark passes the end of the window.
*
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
*/
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}