官方网站使用了一个欺诈检测的项目来演示Flink的功能特性,但是这个项目对于之前完全没有接触过大数据计算框架的同学来说,多少有点难度,如果你也是之前没有接触过大数据,可以先看看本套Flink学习课程的前面几节课程,分别用了三种方式演示了Flink的WordCount的计算实现方式,WordCount一直是大数据入门的HelloWorld。
官方Demo介绍
项目目标是实现一个信用卡欺诈检测功能。
检测规则是:对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
例如:
tx1 tx2 tx3 tx4 tx5 tx5 tx6
$11 $2 $0.1 $501 $0.1 $20 $500
---------- ----------------
Fraud Not Fraud
官方Demo,将这个项目分成了三个阶段,第一个阶段是基于“演练”包,生成一个基本的反欺诈项目,这个项目对于输入的每一笔数据都进行报警。
因为第一个项目还不是预期的需求目标,所以第二个阶段,按照需求进行了规则判断,规则是如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
第三个版本对规则做了更合理的优化,因为试想,如果我一笔小于$1的交易后,然后没有任何交易了,间隔10天后,发生了一笔$500的交易,那这两笔相连的交易,显然不能判定为问题交易,因为中间间隔的时间比较久,第三个版本相当于是对规则的再次升级,使规则更加合理。
这个Demo,主要演示了基于状态+时间来进行计算。
版本1:生成一个基础项目
1、使用maven,基于提供的Flink Maven Archetype,快速生成一个项目骨架
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.14.3 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false
2、将生成的项目导入的IDE中
3、编译运行
因为这是第一个阶段的版本,这个项目对于输入的每一笔数据都进行报警。
输出结果:
15:13:29,622 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - Log file environment variable 'log.file' is not set.
15:13:29,624 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
15:13:30,598 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=1}
15:13:30,706 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=2}
15:13:30,816 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
15:13:30,923 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=4}
15:13:31,034 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=5}
15:13:31,143 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=1}
15:13:31,253 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=2}
15:13:31,364 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
15:13:31,472 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=4}
15:13:31,582 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=5}
15:13:31,692 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=1}
15:13:31,801 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=2}
15:13:31,911 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
15:13:32,020 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=4}
15:13:32,131 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=5}
15:13:32,241 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=1}
代码:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
package spendreport;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
/**
* Skeleton code for implementing a fraud detector.
*/
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}
代码分析:
1、创建一个流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、创建数据源
DataStream transactions = env
.addSource(new TransactionSource())
.name(“transactions”);
3、欺诈检测
DataStream alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name(“fraud-detector”);
4、数据输出
alerts.addSink(new AlertSink())
.name(“send-alerts”);