官方网站使用了一个欺诈检测的项目来演示Flink的功能特性,但是这个项目对于之前完全没有接触过大数据计算框架的同学来说,多少有点难度,如果你也是之前没有接触过大数据,可以先看看本套Flink学习课程的前面几节课程,分别用了三种方式演示了Flink的WordCount的计算实现方式,WordCount一直是大数据入门的HelloWorld。
官方Demo介绍
项目目标是实现一个信用卡欺诈检测功能。
检测规则是:对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
例如:
tx1 tx2 tx3 tx4 tx5 tx5 tx6
$11 $2 $0.1 $501 $0.1 $20 $500
---------- ----------------
Fraud Not Fraud
官方Demo,将这个项目分成了三个阶段,第一个阶段是基于“演练”包,生成一个基本的反欺诈项目,这个项目对于输入的每一笔数据都进行报警。
因为第一个项目还不是预期的需求目标,所以第二个阶段,按照需求进行了规则判断,规则是如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
第三个版本对规则做了更合理的优化,因为试想,如果我一笔小于$1的交易后,然后没有任何交易了,间隔10天后,发生了一笔$500的交易,那这两笔相连的交易,显然不能判定为问题交易,因为中间间隔的时间比较久,第三个版本相当于是对规则的再次升级,使规则更加合理。
这个Demo,主要演示了基于状态+时间来进行计算。
版本2: 检测小于 $1 美元的交易后紧跟着一个大于 $500 的交易
在版本2中,当前的计算,依赖于前一次计算的数据,这用到了Flink的状态计算,本地计算用到了之前的一些结果,Flink会将这些结果保存在其本地,这就是Flink中一次计算用到的状态。
在这个项目中,状态是需要按照账户区分的,因为如果不区分,那么会产生A账户出现小于 $1 美元的交易后,紧跟着B账户一个大于 $500 的交易产生,此时不区分账户,计算引擎就判断为报警,这显然是不对的结果。
当然,我们可以使用如 Map 这样的数据结构来保存每一个账户的状态,但是常规的类成员变量是无法做到容错处理的,当任务失败重启后,之前的状态信息将会丢失。 这样的话,如果程序曾出现过失败重启的情况,将会漏掉一些欺诈报警。
为了应对这个问题,Flink 提供了一套支持容错状态的原语,这些原语几乎与常规成员变量一样易于使用。
Flink 中最基础的状态类型是 ValueState,这是一种能够为被其封装的变量添加容错能力的类型。 ValueState 是一种 keyed state,也就是说它只能被用于 keyed context 提供的 operator 中,即所有能够紧随 DataStream#keyBy 之后被调用的operator。 一个 operator 中的 keyed state 的作用域默认是属于它所属的 key 的。 这个例子中,key 就是当前正在处理的交易行为所属的信用卡账户(key 传入 keyBy() 函数调用),而 FraudDetector 维护了每个帐户的标记状态。 ValueState 需要使用 ValueStateDescriptor 来创建,ValueStateDescriptor 包含了 Flink 如何管理变量的一些元数据信息。状态在使用之前需要先被注册。 状态需要使用 open() 函数来注册状态。
代码:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector2())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
package spendreport;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import java.util.Objects;
/**
* Skeleton code for implementing a fraud detector.
*/
public class FraudDetector2 extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
//前一次交易小于$1的状态标识
private transient ValueState<Boolean> flagState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
//获取上一次计算的状态
Boolean value = flagState.value();
//上一次计算的状态非空,说明上一次状态<$1
if(Objects.nonNull(value)) {
//如果当前金额>$500,则报警
if(transaction.getAmount()>LARGE_AMOUNT) {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
flagState.clear();
}
//判断当前状态是否<$1,小于则标记并更新状态
if(transaction.getAmount()<SMALL_AMOUNT) {
flagState.update(true);
}
}
}
输出结果:
16:14:53,457 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - Log file environment variable 'log.file' is not set.
16:14:53,459 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
16:14:57,436 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:15:02,935 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:15:08,449 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:15:13,945 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
16:15:19,445 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
代码分析:
主要的核心逻辑在于FraudDetector2类的processElement()方法,processElement()方法实现了规则检查。
对于每笔交易,欺诈检测器都会检查该帐户的标记状态。 请记住,ValueState 的作用域始终限于当前的 key,即信用卡帐户。 如果标记状态不为空,则该帐户的上一笔交易是小额的,因此,如果当前这笔交易的金额很大,那么检测程序将输出报警信息。