通过实现 ListCheckpointed 接口来使用 operator state。
ListCheckpointed 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
1)Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发度为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。
2)Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.
demo代码:
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* 计基于Flink算子状态:计算交易总比数
* 基于ListCheckpointed实现
*/
public class State_5_OperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
.name("transactions");
dataStream.print();
dataStream.map(new State_5_Map())
.print();
env.execute();
}
public static class State_5_Map implements MapFunction<Transaction, Long>, ListCheckpointed<Long> {
private Long txCount = 0L;
/**
* 计算交易总比数
* @param transaction
* @return
* @throws Exception
*/
@Override
public Long map(Transaction transaction) throws Exception {
txCount++;
return txCount;
}
/**
* 备份
* @param checkpointId
* @param timestamp
* @return
* @throws Exception
*/
@Override
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(txCount);
}
/**
* 恢复
* @param state
* @throws Exception
*/
@Override
public void restoreState(List<Long> state) throws Exception {
for(Long s:state) {
txCount+=s;
}
}
}
}
/*
1> 1
1> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
2> 1
2> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
1> 2
1> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
2> 2
2> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
1> 3
1> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
2> 3
2> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
1> 4
1> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
2> 4
2> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
1> 5
1> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
2> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
2> 5
1> 6
1> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
*/