Flink从入门到实战八[State]-15-Flink状态 SavePoint使用

什么是 Savepoint
Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。


简单来说SavePoint是一种特殊的Checkpoint,区别在于Checkpoint是自动保存状态,SavePoint是我们手动触发保存状态。Checkpoint一般是用在系统发生故障宕机,用于恢复,SavePoint我们系统升级需要重新部署服务时,手动进行状态保存,便于服务部署重启后的状态恢复,SavePoint是人工干预的。

Savepoint使用方式,官方推荐是通过 uid(String) 方法手动指定算子id,然后用这个id进行恢复。
API格式:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

如何使用 Savepoint
可以使用命令行客户端来触发 Savepoint,触发 Savepoint 并取消作业,从 Savepoint 恢复,以及删除 Savepoint。

1、触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]

2、使用 YARN 触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

3、使用 Savepoint 停止作业
$ bin/flink stop –type [native/canonical] –savepointPath [:targetDirectory] :jobId

4、从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]

5、取消Savepoint
$ ./bin/flink cancel $JOB_ID

6、删除 Savepoint
$ bin/flink savepoint -d :savepointPath

从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复。

Savepoint配置
通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。

# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints