Flink从入门到实战一[快速上手]-8-Flink官方反欺诈项目版本3

官方网站使用了一个欺诈检测的项目来演示Flink的功能特性,但是这个项目对于之前完全没有接触过大数据计算框架的同学来说,多少有点难度,如果你也是之前没有接触过大数据,可以先看看本套Flink学习课程的前面几节课程,分别用了三种方式演示了Flink的WordCount的计算实现方式,WordCount一直是大数据入门的HelloWorld。

官方Demo介绍
项目目标是实现一个信用卡欺诈检测功能。
检测规则是:对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
例如:

tx1   tx2   tx3   tx4   tx5   tx5   tx6
$11   $2    $0.1  $501  $0.1  $20   $500
            ----------  ----------------
	    Fraud       Not Fraud
	    间隔20s     间隔5min

官方Demo,将这个项目分成了三个阶段,第一个阶段是基于“演练”包,生成一个基本的反欺诈项目,这个项目对于输入的每一笔数据都进行报警。
因为第一个项目还不是预期的需求目标,所以第二个阶段,按照需求进行了规则判断,规则是如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
第三个版本对规则做了更合理的优化,因为试想,如果我一笔小于$1的交易后,然后没有任何交易了,间隔10天后,发生了一笔$500的交易,那这两笔相连的交易,显然不能判定为问题交易,因为中间间隔的时间比较久,第三个版本相当于是对规则的再次升级,使规则更加合理。
这个Demo,主要演示了基于状态+时间来进行计算。

版本3: 一笔小于$1的交易后,间隔1分钟以上,发生了一笔$500的交易,则不报警,只有两笔交易间隔在1分钟以内才进行报警
版本3相比之前的版本2,除了使用了状态计算,还需要有一个定时器,来判断第二笔交易是否超过时限。

代码:

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * Skeleton code for the datastream walkthrough
 */
public class FraudDetectionJob3 {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Transaction> transactions = env
			.addSource(new TransactionSource())
			.name("transactions");

		DataStream<Alert> alerts = transactions
			.keyBy(Transaction::getAccountId)
			.process(new FraudDetector3())
			.name("fraud-detector");

		alerts
			.addSink(new AlertSink())
			.name("send-alerts");

		env.execute("Fraud Detection");
	}
}
package spendreport;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

import java.util.Objects;

/**
 * Skeleton code for implementing a fraud detector.
 */
public class FraudDetector3 extends KeyedProcessFunction<Long, Transaction, Alert> {

	private static final long serialVersionUID = 1L;

	private static final double SMALL_AMOUNT = 1.00;
	private static final double LARGE_AMOUNT = 500.00;
	private static final long ONE_MINUTE = 60 * 1000;

	//前一次交易小于$1的状态标识
	private transient ValueState<Boolean> flagState;

	//前一次交易小于$1时,注册定时器
	private transient ValueState<Long> timerState;

	@Override
	public void open(Configuration parameters) throws Exception {
		ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
		flagState = getRuntimeContext().getState(flagDescriptor);

		ValueStateDescriptor<Long> timeDescriptor = new ValueStateDescriptor<>("time-state", Types.LONG);
		timerState = getRuntimeContext().getState(timeDescriptor);
	}

	@Override
	public void processElement(
			Transaction transaction,
			Context context,
			Collector<Alert> collector) throws Exception {

		//获取上一次计算的状态
		Boolean value = flagState.value();
		//上一次计算的状态非空,说明上一次状态<$1
		if(Objects.nonNull(value)) {
			//如果当前金额>$500,则报警
			if(transaction.getAmount()>LARGE_AMOUNT) {
				Alert alert = new Alert();
				alert.setId(transaction.getAccountId());
				collector.collect(alert);
			}
			//flagState.clear();
			cleanUp(context);
		}

		//判断当前状态是否<$1,小于则标记并更新状态
		if(transaction.getAmount()<SMALL_AMOUNT) {
			flagState.update(true);

			long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
			context.timerService().registerProcessingTimeTimer(timer);

			timerState.update(timer);
		}

	}

	@Override
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
		timerState.clear();
		flagState.clear();
	}

	private void cleanUp(Context ctx) throws Exception {
		// delete timer
		Long timer = timerState.value();
		ctx.timerService().deleteProcessingTimeTimer(timer);

		// clean up all state
		timerState.clear();
		flagState.clear();
	}
}

输出结果:

16:58:04,212 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils          [] - Log file environment variable 'log.file' is not set.
16:58:04,214 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils          [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
16:58:08,176 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}
16:58:13,670 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}
16:58:19,166 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}
16:58:24,668 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}
16:58:30,171 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}

代码分析:
if(transaction.getAmount()<SMALL_AMOUNT)判断当前状态<$1,设置一个当前时间一分钟后触发的定时器,同时,将触发时间保存到 timerState 状态中。处理时间是本地时钟时间,这是由运行任务的服务器的系统时间来决定的。
当定时器触发时,将会调用 KeyedProcessFunction#onTimer 方法。 通过重写这个方法来实现一个你自己的重置状态的回调逻辑。