前面几节内容已经演示了如何使用Flink状态进行计算,如何对状态进行保存、恢复。要实现状态的应用计算和保存恢复,我们只需要实现对应的接口,并实现接口对应的方法就可以了。(可以复习前面章节的内容)
但这些仅仅是对Flink状态的应用,是基于API编写的代码。那么Flink内部到底如何对状态进行保存的,发生故障时,如何基于保存的状态进行还原的,就是我们接下来要学习的内容,并且这些内容是涉及到日后线上运维时的数据正确性,是非常重要的内容。
状态保存
Flink内部的快照处理是基于检查点(checkpoint)对状态数据进行快照的,也就是一个持久化的操作。之所以称之为快照,就是在某个时间点对所有任务状态的一份数据拷贝。
Flink分布式快照的一个核心元素是流屏障。这些屏障被注入数据流,并与记录一起作为数据流的一部分流动。
屏障永远不会超过记录,它们严格按照顺序流动。屏障将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录集。
例如:
屏障 屏障
----------------------------------------------------------
--> 21 20 17 |*| 16 15 14 13 |*| 12 11 8 5 -->
----------------------------------------------------------
系统会间隔一定数据就插入一个屏障。
每个屏障都带有快照的ID,该快照的记录随着流被不停的向前推动。
来自不同快照的多个屏障可以同时出现在流中,这意味着各种快照可能同时发生。
Checkpoint 执行过程
Jobmanager中有一个组件checkpoint-Coordinator,负责触发Source节点的Checkpoint操作,触发Checkpoint则进行Source的State持久化。
通过在Source这里对数据注入我们前面说的“屏障”,并向下游广播这个“屏障”,下游节点的Checkpoint则是自动触发的,并且会进行对齐操作。
整体流程:
----|----------->Source----------->|-------------|
|JM | |Map等Operator|------>Sink
----|----------->Source----------->|-------------| ↓
↑ ↓
↑----<-------<--------<----------<----------<---------<--
task完成状态保存,路径回写JM
1、JM触发每个Source节点的Checkpoint,并注入barrier(屏障)事件。
2、各个task执行过程中,需要对齐Checkpoint barrier,对齐Checkpoint barrier的目的就是对齐每个Checkpoint barrier之间的数据,从而保证数据的一致性,对齐之后触发一次快照保存。
3、各个task执行完成后,Checkpoint barrier会继续向下传递,直到传递给sink。
4、sink执行完成快照保存后,会通知JM的checkpoint-Coordinator,那么一次完整的checkpoint操作就完成了。
对齐操作只有存在多数据源的时候,才会进行对齐,对齐是有一定的性能影响的,因为涉及到等待Checkpoint barrier事件。
从1.11版本开始,Flink支持Unaligned Checkpointing。实现机制是先到达的数据计算完成后先缓存在本地,等待其它Checkpoint barrier事件到达并对齐后,再进行Checkpoint。
状态恢复
当故障发生时,要对算子状态进行恢复,这有一个非常重要的前提条件:数据源支持重发,通过偏移量来控制从哪个位置来消费数据。
为什么要支持重发?如我们上面的例图,假设当前状态保存点在12之后的屏障,但是source数据源的数据可能已经发送到了15,此时发生故障,算子中的状态只持久化在12,数据恢复,也只能恢复到12,所以之后的数据状态在恢复之后是不存在的,所以此时需要从数据源中的13,重放数据,重新开始计算。
总结checkpoint-检查点的作用:
1)状态持久化
2)异步快照算法:Chandy-Lamport,来实现分布式一致性快照
3)持久化的状态放到例如HDFS
4)异常恢复
前提条件:数据源支持重发,通过偏移量来控制从哪个位置来消费数据