上一节了解了Checkpoint的执行流程和内部原理,那么Checkpoint是如何使用的呢,我们先来看一下API。
在此之前,还是需要重点说明一下,Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:
一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
Checkpoint API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint,这个就是触发JM的checkpoint-Coordinator的一个事件间隔设置
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
代码:
package com.itzhimei.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class State_8_Checkpoint {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 每 1000ms 开始一次 checkpoint,这个就是触发JM的checkpoint-Coordinator的一个事件间隔设置
env.enableCheckpointing(1000);
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
//env.getCheckpointConfig().enableUnalignedCheckpoints();
env.setStateBackend(new FsStateBackend("file:///D:/flink/checkpoints/")); //file:///D:/______flink______/checkpoints/
/*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));*/
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
.name("transactions");
dataStream.print();
dataStream.map(new State_8_cp_Map())
.print();
env.execute();
}
public static class State_8_cp_Map implements MapFunction<Transaction, Long>, CheckpointedFunction {
//private ReducingState<Long> countPerKey;
private ListState<Long> countPerPartition;
private long localCount;
/**
* 计算交易总比数
* @param transaction
* @return
* @throws Exception
*/
@Override
public Long map(Transaction transaction) throws Exception {
// update the states
//countPerKey.add(1L);
localCount++;
return localCount;
}
/**
* 备份
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// the keyed state is always up to date anyways
// just bring the per-partition state in shape
countPerPartition.clear();
countPerPartition.add(localCount);
}
/**
* 恢复
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// get the state data structure for the per-key state
/*countPerKey = context.getKeyedStateStore().getReducingState(
new ReducingStateDescriptor<>("perKeyCount", new State_4_AddFunction(), Long.class));*/
// get the state data structure for the per-partition state
countPerPartition = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("perPartitionCount", Long.class));
// initialize the "local count variable" based on the operator state
for (Long l : countPerPartition.get()) {
localCount += l;
}
}
}
}
/* 输出
Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
1
Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
2
Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
3
Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
4
Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
5
Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6
Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
7
Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8
Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
9
Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
10
Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
11
*/
在你配置的目录下,生成了一个一个uuid的目录,目录中有3个子文件夹:
chk-18
shared
taskowned