通过实现 CheckpointedFunction 接口来使用 operator state。
CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
1)Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发度为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。
2)Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.
demo代码:
public class State_4_OperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
.name("transactions");
dataStream.print();
dataStream.map(new State_4_Map())
.print();
env.execute();
}
public static class State_4_Map implements MapFunction<Transaction, Long>, CheckpointedFunction {
private ListState<Long> countPerPartition;
private long localCount;
/**
* 计算交易总比数
* @param transaction
* @return
* @throws Exception
*/
@Override
public Long map(Transaction transaction) throws Exception {
// update the states
localCount++;
return localCount;
}
/**
* 备份
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// the keyed state is always up to date anyways
// just bring the per-partition state in shape
countPerPartition.clear();
countPerPartition.add(localCount);
}
/**
* 恢复
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// get the state data structure for the per-partition state
countPerPartition = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("perPartitionCount", Long.class));
// initialize the "local count variable" based on the operator state
for (Long l : countPerPartition.get()) {
localCount += l;
}
}
}
}
/* 输出
Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
1
Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
2
Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
3
Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
4
Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
5
Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6
Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
7
Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8
Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
9
Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
10
Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
11
*/
调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。
当初始化好状态对象后,我们通过 isRestored() 方法判断是否从之前的故障中恢复回来,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑。