Flink Checkpoint 是一种容错恢复机制。这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。Checkpoint 对于用户层面,是透明的,用户会感觉程序一直在运行。Flink Checkpoint 是 Flink 自身的系统行为,用户无法对其进行交互,用户可以在程序启动之前,设置好实时程序 Checkpoint 相关参数,当程序启动之后,剩下的就全交给 Flink 自行管理。当然在某些情况,比如 Flink On Yarn 模式,某个 Container 发生 OOM 异常,这种情况程序直接变成失败状态,此时 Flink 程序虽然开启 Checkpoint 也无法恢复,因为程序已经变成失败状态,所以此时可以借助外部参与启动程序,比如外部程序检测到实时任务失败时,从新对实时任务进行拉起。
Checkpoint 原理
Flink Checkpoint 机制保证 Flink 任务运行突然失败时,能够从最近 Checkpoint 进行状态恢复启动,进行错误容忍。它是一种自动容错机制,而不是具体的状态存储镜像。Flink Checkpoint 受 Chandy-Lamport 分布式快照启发,其内部使用分布式数据流轻量级异步快照。
Checkpoint 保存的状态在程序取消时,默认会进行清除。Checkpoint 状态保留策略有两种:
- DELETE_ON_CANCELLATION 表示当程序取消时,删除 Checkpoint 存储文件。
- RETAIN_ON_CANCELLATION 表示当程序取消时,保存之前的 Checkpoint 存储文件。用户可以结合业务情况,设置 Checkpoint 保留模式。
默认情况下,Flink不会触发一次 Checkpoint 当系统有其他 Checkpoint 在进行时,也就是说 Checkpoint 默认的并发为1。针对 Flink DataStream 任务,程序需要经历从 StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图四个步骤,其中在 ExecutionGraph 构建时,会初始化 CheckpointCoordinator。ExecutionGraph通过ExecutionGraphBuilder.buildGraph方法构建,在构建完时,会调用 ExecutionGraph 的enableCheckpointing方法创建CheckpointCoordinator。CheckpoinCoordinator 是 Flink 任务 Checkpoint 的关键,针对每一个 Flink 任务,都会初始化一个 CheckpointCoordinator 类,来触发 Flink 任务 Checkpoint。
Flink 会定时在任务的 Source Task 触发 Barrier,Barrier是一种特殊的消息事件,会随着消息通道流入到下游的算子中。只有当最后 Sink 端的算子接收到 Barrier 并确认该次 Checkpoint 完成时,该次 Checkpoint 才算完成。所以在某些算子的 Task 有多个输入时,会存在 Barrier 对齐时间,我们可以在Web UI上面看到各个 Task 的Barrier 对齐时间
Checkpoint 语义
Flink Checkpoint 支持两种语义:Exactly Once 和 At least Once,默认的 Checkpoint 模式是 Exactly Once. Exactly Once 和 At least Once 具体是针对 Flink 状态 而言。具体语义含义如下:
Exactly Once 含义是:保证每条数据对于 Flink 的状态结果只影响一次。打个比方,比如 WordCount程序,目前实时统计的 "hello" 这个单词数为5,同时这个结果在这次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又来2个 "hello" 单词,突然程序遇到外部异常容错自动回复,从最近的 Checkpoint 点开始恢复,那么会从单词数 5 这个状态开始恢复,Kafka 消费的数据点位还是状态 5 这个时候的点位开始计算,所以即使程序遇到外部异常自我恢复,也不会影响到 Flink 状态的结果。
At Least Once 含义是:每条数据对于 Flink 状态计算至少影响一次。比如在 WordCount 程序中,你统计到的某个单词的单词数可能会比真实的单词数要大,因为同一条消息,你可能将其计算多次。
Flink 中 Exactly Once 和 At Least Once 具体是针对 Flink 任务状态而言的,并不是 Flink 程序对其处理一次。举个例子,当前 Flink 任务正在做 Checkpoint,该次Checkpoint还么有完成,该次 Checkpoint 时间端的数据其实已经进入 Flink 程序处理,只是程序状态没有最终存储到远程存储。当程序突然遇到异常,进行容错恢复,那么就会从最新的 Checkpoint 进行状态恢复重启,上一部分还会进入 Flink 系统处理。Exactly Once 和 At Least Once 具体在底层实现大致相同,具体差异表现在 Barrier 对齐方式处理。
如果是 Exactly Once 模式,某个算子的 Task 有多个输入通道时,当其中一个输入通道收到 Barrier 时,Flink Task 会阻塞处理该通道,其不会处理这些数据,但是会将这些数据存储到内部缓存中,一旦完成了所有输入通道的 Barrier 对齐,才会继续对这些数据进行消费处理。
对于 At least Once,同样针对某个算子的 Task 有多个输入通道的情况下,当某个输入通道接收到 Barrier 时,它不同于Exactly Once,At Least Once 会继续处理接受到的数据,即使没有完成所有输入通道 Barrier 对齐。所以使用At Least Once 不能保证数据对于状态计算只有一次影响。
Checkpoint 参数配置及建议
1. 当 Checkpoint 时间比设置的 Checkpoint 间隔时间要长时,可以设置 Checkpoint 间最小时间间隔 。这样在上次 Checkpoint 完成时,不会立马进行下一次 Checkpoint,而是会等待一个最小时间间隔,然后在进行该次 Checkpoint。否则,每次 Checkpoint 完成时,就会立马开始下一次 Checkpoint,系统会有很多资源消耗 Checkpoint。
2. 如果Flink状态很大,在进行恢复时,需要从远程存储读取状态恢复,此时可能导致任务恢复很慢,可以设置 Flink Task 本地状态恢复。任务状态本地恢复默认没有开启,可以设置参数`state.backend.local-recovery`值为`true`进行激活。
3. Checkpoint保存数,Checkpoint 保存数默认是1,也就是保存最新的 Checkpoint 文件,当进行状态恢复时,如果最新的Checkpoint文件不可用时(比如HDFS文件所有副本都损坏或者其他原因),那么状态恢复就会失败,如果设置 Checkpoint 保存数2,即使最新的Checkpoint恢复失败,那么Flink 会回滚到之前那一次Checkpoint进行恢复。考虑到这种情况,用户可以增加 Checkpoint 保存数。
4. 建议设置的 Checkpoint 的间隔时间最好大于 Checkpoint 的完成时间。