EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。
RocksDB是一个本地库,它直接从进程分配内存, 而不是从JVM分配内存。分配给 RocksDB 的任何内存都必须被考虑在内,通常需要将这部分内存从任务管理器(TaskManager)的JVM堆中减去。
使用 EmbeddedRocksDBStateBackend,需要添加pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.14.3</version>
</dependency>
API示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
代码:
package com.itzhimei.state;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import java.util.concurrent.TimeUnit;
public class State_9_StateBackends {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 每 1000ms 开始一次 checkpoint,这个就是触发JM的checkpoint-Coordinator的一个事件间隔设置
env.enableCheckpointing(1000);
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///D:/flink/checkpoints/"));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
.name("transactions");
dataStream.print();
dataStream.keyBy(Transaction::getAccountId)
.flatMap(new State_9_FlatMap())
.print();
env.execute();
}
public static class State_9_FlatMap extends RichFlatMapFunction<Transaction, Tuple3<Long, Integer, Double>> {
private ValueState<Tuple2<Integer, Double>> sum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Integer, Double>> descriptor =
new ValueStateDescriptor<>(
"sum", // the state name
TypeInformation.of(new TypeHint<Tuple2<Integer, Double>>() {}), // type information
Tuple2.of(0, 0D)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Transaction transaction, Collector<Tuple3<Long, Integer, Double>> collector) throws Exception {
Tuple2<Integer, Double> value = sum.value();
value.f0 = value.f0+1;
value.f1 = value.f1+transaction.getAmount();
sum.update(value);
collector.collect(new Tuple3<>(transaction.getAccountId(), value.f0, value.f1));
}
}
}
/* 输出结果
8> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
6> (1,1,188.23)
1> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
8> (2,1,374.79)
2> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
8> (3,1,112.15)
3> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
1> (4,1,478.75)
4> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
8> (5,1,208.85)
5> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6> (1,2,567.87)
6> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
8> (2,2,726.23)
7> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8> (3,2,432.9)
8> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
1> (4,2,738.1700000000001)
8> (5,2,482.28999999999996)
1> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
2> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
6> (1,3,835.12)
3> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
8> (2,3,1123.38)
8> (3,3,433.11899999999997)
4> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
5> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
1> (4,3,970.1100000000001)
6> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
8> (5,3,867.02)
*/