官方网站使用了一个欺诈检测的项目来演示Flink的功能特性,但是这个项目对于之前完全没有接触过大数据计算框架的同学来说,多少有点难度,如果你也是之前没有接触过大数据,可以先看看本套Flink学习课程的前面几节课程,分别用了三种方式演示了Flink的WordCount的计算实现方式,WordCount一直是大数据入门的HelloWorld。
官方Demo介绍
项目目标是实现一个信用卡欺诈检测功能。
检测规则是:对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
例如:
tx1 tx2 tx3 tx4 tx5 tx5 tx6
$11 $2 $0.1 $501 $0.1 $20 $500
---------- ----------------
Fraud Not Fraud
间隔20s 间隔5min
官方Demo,将这个项目分成了三个阶段,第一个阶段是基于“演练”包,生成一个基本的反欺诈项目,这个项目对于输入的每一笔数据都进行报警。
因为第一个项目还不是预期的需求目标,所以第二个阶段,按照需求进行了规则判断,规则是如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
第三个版本对规则做了更合理的优化,因为试想,如果我一笔小于$1的交易后,然后没有任何交易了,间隔10天后,发生了一笔$500的交易,那这两笔相连的交易,显然不能判定为问题交易,因为中间间隔的时间比较久,第三个版本相当于是对规则的再次升级,使规则更加合理。
这个Demo,主要演示了基于状态+时间来进行计算。
版本3: 一笔小于$1的交易后,间隔1分钟以上,发生了一笔$500的交易,则不报警,只有两笔交易间隔在1分钟以内才进行报警
版本3相比之前的版本2,除了使用了状态计算,还需要有一个定时器,来判断第二笔交易是否超过时限。
代码:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector3())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
package spendreport;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import java.util.Objects;
/**
* Skeleton code for implementing a fraud detector.
*/
public class FraudDetector3 extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
//前一次交易小于$1的状态标识
private transient ValueState<Boolean> flagState;
//前一次交易小于$1时,注册定时器
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timeDescriptor = new ValueStateDescriptor<>("time-state", Types.LONG);
timerState = getRuntimeContext().getState(timeDescriptor);
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
//获取上一次计算的状态
Boolean value = flagState.value();
//上一次计算的状态非空,说明上一次状态<$1
if(Objects.nonNull(value)) {
//如果当前金额>$500,则报警
if(transaction.getAmount()>LARGE_AMOUNT) {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
//flagState.clear();
cleanUp(context);
}
//判断当前状态是否<$1,小于则标记并更新状态
if(transaction.getAmount()<SMALL_AMOUNT) {
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
timerState.clear();
flagState.clear();
}
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
}
输出结果:
16:58:04,212 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - Log file environment variable 'log.file' is not set.
16:58:04,214 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
16:58:08,176 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:58:13,670 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:58:19,166 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:58:24,668 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:58:30,171 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
代码分析:
if(transaction.getAmount()<SMALL_AMOUNT)判断当前状态<$1,设置一个当前时间一分钟后触发的定时器,同时,将触发时间保存到 timerState 状态中。处理时间是本地时钟时间,这是由运行任务的服务器的系统时间来决定的。
当定时器触发时,将会调用 KeyedProcessFunction#onTimer 方法。 通过重写这个方法来实现一个你自己的重置状态的回调逻辑。