Checkpoint的作用是将状态持久化,State Backend(状态后端)则是真正管理状态、存储状态、基于checkpoint将状态持久化的模块。
State Backend决定CheckPoint 时如何持久化以及持久化在哪里。
Flink 提供了三种 state backends,它用于指定状态的存储方式和位置。
状态可以位于内存,也可以放在磁盘,取决于你状态的大小和可靠性的考虑。
API用法:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);
State Backend分为三种:
1、MemoryStateBackend
内存
2、FsStateBackend
文件系统hdfs
3、RocksDBStateBackend
本地RocksDB
三种State Backend的特点
1、MemoryStateBackend
• 构造方法
env.setStateBackend(new MemoryStateBackend( “file://” + baseCheckpointPath, null).configure(conf, classLoader))
• 数据存储
• State 数据存储在 TaskManager 内存中
• Checkpoint 数据数据存储在 JobManager 内存
• 容量限制
• 单词 State maxStateSize 默认为5M
• maxStateSize <= akka.framesize 默认10M
• 总大小不能超过 JobMananger 的内存
• 默认后端状态管理器
• 推荐场景: 本地测试、状态比较少的作业
• 不推荐生产环境中使用
2、FsStateBackend
• 构造方法:
env.setStateBackend( new FsStateBackend(tmpPath))
• 数据存储:
• 状态数据:TaskManager 内存
• Checkpoint:外部文件系统(本地或 HDFS)
• 容量限制:
• 单个 TaskManager上State 总量不能超过TM内存
• 总数据大小不超过文件系统容量
• 推荐场景:
• 常规状态作业
• 窗口时间比较长,如分钟级别窗口聚合,Join 等
• 需要开启 HA 的作业
• 可在生产环境中使用
3、RocksDBStateBackend
• 创建方法:
env.setStateBackend(new RocksDBStateBackend(“file://” + baseCheckpointPath).configure(conf, classLoader))
• 数据存储:
• State:TaskManager 中的 KV 数据库(实际使用内存+磁盘)
• Checkpoint:外部文件系统(本地或 HDFS)
• 容量限制:
• 单 TaskManager 上 State 总量不超过其内存 + 磁盘大小
• 单 Key 最大容量2G
• 总大小不超过配置的文件系统容量
• 推荐场景:
• 超大状态作业
• 需要开启 HA 的作业
• 对状态读写性能要求不高的作业
• 生产环境可用
State Backend 配置有两种方法
1、配置文件flink-conf.yaml中配置
# The backend that will be used to store operator state checkpoints
# jobmanager, filesystem ,rocksdb
state.backend: filesystem
# Directory for storing checkpoints
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
2、在代码中指定
// 获取StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置StateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));