Window Evictor
驱逐器(Evictor)能够在触发器触发之后,窗口函数开始计算之前或之后从窗口中清除元素。
Flink带有三种内置驱逐器:
CountEvictor:保留一定数目的元素,多余的元素按照从前到后的顺序先后清理。
TimeEvictor:保留一个时间段的元素,早于这个时间段的元素会被清理。
DeltaEvictor:窗口计算时,最近一条 Element 和其他 Element 做 Delta 计算,仅保留 Delta 结果在指定 Threshold 内的 Element。
用法:
windowStream.evictor(CountEvictor.of(5))
windowStream.evictor(TimeEvictor.of(Time.of(5, TimeUnit.SECONDS)))
windowStream.evictor(DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String,
Integer>>() {
@Override
public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String,
Integer> newDataPoint) {
return newDataPoint.f1 - oldDataPoint.f1;
}}, evictAfter),0,null))
代码:
package com.itzhimei.window;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
* 使用TimeEvictor排除数据再进行窗口计算
* 目标:排除窗口内12分钟之前的数据,并进行计算
*
*/
public class Window_11_Evictor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource())
.name("transactions");
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(15))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.evictor(TimeEvictor.of(Time.of(12, TimeUnit.MINUTES)))
.sum("amount");
amount.print("amount");
env.execute();
}
}
/*
注释代码:.evictor(TimeEvictor.of(Time.of(12, TimeUnit.MINUTES)))
没有排除数据的输出结果:
amount:15> Transaction{accountId=3, timestamp=1546272720000, amount=432.9}
amount:11> Transaction{accountId=1, timestamp=1546272000000, amount=567.87}
amount:1> Transaction{accountId=4, timestamp=1546273080000, amount=738.1700000000001}
amount:16> Transaction{accountId=2, timestamp=1546272360000, amount=726.23}
amount:16> Transaction{accountId=5, timestamp=1546273440000, amount=482.28999999999996}
amount:1> Transaction{accountId=4, timestamp=1546276680000, amount=254.04}
amount:15> Transaction{accountId=3, timestamp=1546276320000, amount=0.989}
amount:16> Transaction{accountId=2, timestamp=1546275960000, amount=810.06}
amount:16> Transaction{accountId=5, timestamp=1546277040000, amount=762.27}
amount:11> Transaction{accountId=1, timestamp=1546275600000, amount=686.87}
amount:16> Transaction{accountId=2, timestamp=1546279560000, amount=458.4}
amount:15> Transaction{accountId=3, timestamp=1546279920000, amount=871.9499999999999}
amount:11> Transaction{accountId=1, timestamp=1546279200000, amount=859.35}
amount:1> Transaction{accountId=4, timestamp=1546280280000, amount=415.08}
amount:16> Transaction{accountId=5, timestamp=1546280640000, amount=206.98000000000002}
amount:16> Transaction{accountId=2, timestamp=1546283160000, amount=730.02}
排除数据的输出结果:
amount:15> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
amount:16> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
amount:11> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
amount:1> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
amount:16> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
amount:1> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
amount:11> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
amount:16> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
amount:15> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
amount:16> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
amount:1> Transaction{accountId=4, timestamp=1546282080000, amount=64.19}
amount:11> Transaction{accountId=1, timestamp=1546281000000, amount=483.91}
amount:15> Transaction{accountId=3, timestamp=1546281720000, amount=871.15}
amount:16> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
amount:16> Transaction{accountId=5, timestamp=1546282440000, amount=79.43}
amount:16> Transaction{accountId=2, timestamp=1546284960000, amount=473.54}
amount:1> Transaction{accountId=4, timestamp=1546285680000, amount=323.59}
amount:15> Transaction{accountId=3, timestamp=1546285320000, amount=119.92}
amount:11> Transaction{accountId=1, timestamp=1546284600000, amount=274.73}
amount:16> Transaction{accountId=5, timestamp=1546286040000, amount=353.16}
amount:16> Transaction{accountId=2, timestamp=1546288560000, amount=479.83}
amount:16> Transaction{accountId=5, timestamp=1546289640000, amount=292.44}
amount:11> Transaction{accountId=1, timestamp=1546288200000, amount=373.26}
amount:1> Transaction{accountId=4, timestamp=1546289280000, amount=83.64}
amount:15> Transaction{accountId=3, timestamp=1546288920000, amount=454.25}
*/
也可以自定义排除器,例如:
.evictor(new Evictor<Transaction, TimeWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<Transaction>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
}
@Override
public void evictAfter(Iterable<TimestampedValue<Transaction>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
}
})
自定义排除器相当于实现了Evictor接口,需要实现内部的两个方法:evictBefore和evictAfter。
public interface Evictor<T, W extends Window> extends Serializable {
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
/** A context object that is given to {@link Evictor} methods. */
interface EvictorContext {
/** Returns the current processing time. */
long getCurrentProcessingTime();
/**
* Returns the metric group for this {@link Evictor}. This is the same metric group that
* would be returned from {@link RuntimeContext#getMetricGroup()} in a user function.
*
* <p>You must not call methods that create metric objects (such as {@link
* MetricGroup#counter(int)} multiple times but instead call once and store the metric
* object in a field.
*/
MetricGroup getMetricGroup();
/** Returns the current watermark time. */
long getCurrentWatermark();
}
}