什么是 Savepoint
Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。
简单来说SavePoint是一种特殊的Checkpoint,区别在于Checkpoint是自动保存状态,SavePoint是我们手动触发保存状态。Checkpoint一般是用在系统发生故障宕机,用于恢复,SavePoint我们系统升级需要重新部署服务时,手动进行状态保存,便于服务部署重启后的状态恢复,SavePoint是人工干预的。
Savepoint使用方式,官方推荐是通过 uid(String) 方法手动指定算子id,然后用这个id进行恢复。
API格式:
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
如何使用 Savepoint
可以使用命令行客户端来触发 Savepoint,触发 Savepoint 并取消作业,从 Savepoint 恢复,以及删除 Savepoint。
1、触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]
2、使用 YARN 触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
3、使用 Savepoint 停止作业
$ bin/flink stop –type [native/canonical] –savepointPath [:targetDirectory] :jobId
4、从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]
5、取消Savepoint
$ ./bin/flink cancel $JOB_ID
6、删除 Savepoint
$ bin/flink savepoint -d :savepointPath
从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复。
Savepoint配置
通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。
# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints