基于前一节我们已经对Broadcast State和其API有了一定了解,本节我们一起来编写代码,实现一个demo。
因为广播流是一个低吞吐的流,所以广播流的使用场景适合广播全局都要使用的规则信息。例如我们要计算用户交易数据,此时计算规则,就可以用广播流广播到各个并发算子中,每个算子获取的规则都是相同的。
使用广播流的API整体流程:
1、先定义一个业务流
2、定义广播流和广播流的状态信息
3、将两个流进行合并,合并顺序:业务流.connect(广播流)
4、继承BroadcastProcessFunction或KeyedBroadcastProcessFunction类,实现其中的processElement()和processBroadcastElement()方法
代码:
/**
* 用户交易计算规则从广播流获取
*/
public class State_7_BroadcastState {
//广播流的状态
private static MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<String>() {}));
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//业务流的数据从交易模拟器中输入
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
.name("transactions");
KeyedStream<Transaction, Long> keyStream = dataStream.keyBy(item -> item.getAccountId());
//广播流的规则,从Socket输入
DataStream<String> ruleStream = env.socketTextStream("localhost", 7777);
BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
//connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入
keyStream.connect(ruleBroadcastStream).process(new State_7_BroadcastState_Inner());
env.execute();
}
public static class State_7_BroadcastState_Inner extends KeyedBroadcastProcessFunction<Long, Transaction, String, Void> {
@Override
public void processElement(Transaction value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
//这里处理的非广播流的业务逻辑
//模拟基于用户交易匹配匹配广播的规则,然后进行计算
StringBuffer sb = new StringBuffer("");
for (Map.Entry<String, String> entry :
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final String rule = entry.getValue();
sb.append(ruleName).append(":").append(rule).append(";");
}
System.out.println("当前用户:"+value.getAccountId()+" 匹配到的规则:" + sb.toString());
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<Void> out) throws Exception {
//这里处理的广播流的业务逻辑
//当广播流有规则输入,则将规则放入state
ctx.getBroadcastState(ruleStateDescriptor).put(value, value);
}
}
}
/*
Socket一次输入:
rule1
rule2
rule3
观察日志输出。
当Socket没有输入规则时,控制台的输出:
当前用户:1 匹配到的规则:
当前用户:2 匹配到的规则:
当前用户:3 匹配到的规则:
当前用户:4 匹配到的规则:
当前用户:5 匹配到的规则:
当前用户:1 匹配到的规则:
当前用户:2 匹配到的规则:
当前用户:3 匹配到的规则:
当前用户:4 匹配到的规则:
===========================================================
当Socket输入rule1规则时,控制台的输出:
当前用户:2 匹配到的规则:rule1:rule1;
当前用户:3 匹配到的规则:rule1:rule1;
当前用户:4 匹配到的规则:rule1:rule1;
当前用户:5 匹配到的规则:rule1:rule1;
当前用户:1 匹配到的规则:rule1:rule1;
当前用户:2 匹配到的规则:rule1:rule1;
当前用户:3 匹配到的规则:rule1:rule1;
当前用户:4 匹配到的规则:rule1:rule1;
当前用户:5 匹配到的规则:rule1:rule1;
当前用户:1 匹配到的规则:rule1:rule1;
当前用户:2 匹配到的规则:rule1:rule1;
当前用户:3 匹配到的规则:rule1:rule1;
===========================================================
当Socket输入rule2规则时,控制台的输出:
当前用户:5 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:1 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:2 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:3 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:4 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:5 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:1 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:2 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:3 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:4 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:5 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:1 匹配到的规则:rule1:rule1;rule2:rule2;
*/