searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-增量查询ckp超时

2023-12-19 02:14:31
7
0
# 环境
 
- hudi 0.11.1
- flink 1.14.5
- MOR表,按天分区,约几百个,每个分区下基本只有一个base文件(最近的分区文件也不多)
- Aligned Checkpoints
- checkpoint timeout 10 分钟
- checkpoint interval 1 分钟
 
# 首次checkpoint超时报错
 
> Checkpoint expired before completing.
 
有一个hudi mor表,我们在它上面做实时应用,但要求初始运行时也须要处理历史的全量数据,因此在使用hudi的[增量查询](https://blog.csdn.net/czmacd/article/details/126059726)时设置了`read.start-commit=earliest`,在运行10分钟后就超时报错。通过查看checkpoint详情,发现split_monitor很快完成了ckp,但是算子**split_reader**一直没有响应,所以是它阻塞了checkpoint.
 
# split_reader为什么不checkpoint
 
通过观察每个算子的输入输出监控信息,split_monitor首次增量查询时产生了约1000+个分片[(文件切片)](https://blog.csdn.net/czmacd/article/details/129049025),实际的表基本就是1000+个很小的parquet文件(几千万数据)。
 
>The operator that reads the splits received from the preceding StreamReadMonitoringFunction. Contrary to the StreamReadMonitoringFunction which has a parallelism of 1, this operator can have multiple parallelism.
As soon as an input split MergeOnReadInputSplit is received, it is put into a queue, the MailboxExecutor read the actual data of the split. This architecture allows the separation of split reading from processing the checkpoint barriers, thus removing any potential back-pressure.
 
按源码`StreamReadOperator`的注释,它只是将分片信息缓存起来(最终保存到flink状态中),再使用`MailboxExecutor`进行微批文件读取(每次2048条记录),将数据的读取和栅栏的处理分离,从而避免背压。
 
但是发现了split_reader接收分片的速度很慢,效果如同边接收分片边读取数据一般,这与设计之衷相悖了。经调试证明了以下三个操作均处于同一任务线程:
 
- StreamReadOperator#snapshotState:处理barrier,快照状态数据
- StreamReadOperator#processElement:接收分片并缓存
- StreamReadOperator#processSplits:读取分片数据,每次只读取2048条
 
正因如此,首个barrier生成是插在1000个分片后面,所以barrier被任务线程接收到时已经过了超时时间(Start Delay很大,说明barrier一直没到)。
 
事实上我的split_reader算子并行度因业务需要只设置了1,就算提高并行度也解决不了问题,因为下游的处理速度实在太慢了,不可能在10分钟之内处理完几千万数据。**说到底,还是因为背压导致barrier不能及时对齐**,但是split_reader的稳定性不应该依赖于背压,如设计所述,**数据读取和栅栏处理应该分离**。
 
# MailBox线程模型
 
在flink 1.10之前,通过全局锁(checkpointLock)来确保任务线程安全,后来搞了个MailBox线程模型,底层是一个FIFO的阻塞队列+单线程处理器,将需要处理的操作封装成Mail,投递到MailBox中,再由`MailBoxProcess`去循环一个个消费处理,这个过程就是`StreamTask`的核心逻辑。
 
> 处理mail的线程就是任务线程
 
```
while (true) {
step 1.处理mailbox中的所有mail(processSplits)
step 2.处理流数据、barrier等
}
```
 
代入到`StreamReadOperator`中,每个Mail就是一次微批数据读取(processSplits),所以任务线程串行地运行着以下操作:
```
while 1:缓存split_1 -> 生成并递投mail_1到邮箱 
while 2:消费mail_1 -> 接收缓存下一个split_2 -> 生成并投递mail_2到邮箱
while 3:消费mail_2 -> 接收缓存下一个split_3 -> 生成并投递mail_3到邮箱
while 4:消费mail_3 -> 接收缓存下一个split_4 -> 生成并投递mail_4到邮箱
while 5:......
while 6:消费mail_1200 -> 处理barrier,调用snapshotState快照(第一次checkpoint)
```
由于有`StreamReadOperator#currentSplitState`维持着状态,所以在前一个mail被消费之前不会有新的mail可以投递成功,如果首次增量查询生成1200个分片的话,每缓存一个分片都伴随着一次微批数据读取,为了避免checkpoint超时,必须在10分钟中完成1200次微批读取,并且下游能及时处理完1200*2048=2457600个数据记录。
 
# 解决方案
 
本质上是不能及时接收到barrier,由于下游背压或自身处理速度不足引起,所以要从这两方面考虑方案。
 
- 方案一:提高容忍checkpoint失败次数
 
```
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
```
默认情况下,checkpoint失败会引起作业重启,通过设置容忍失败次数,即10*100min内checkpoint失败可以忽略。这种方法需要预估最小时间来设置参数2,同时在处理完历史数据后需要重启一下作业将值改小。另外需要对其它算子的影响,特别是对于有状态的算子,或者是有些sink需要依赖checkpoint。
 
- 方案二:提高并行度
 
通过并行度来提高整体作业的吞吐量
1. split_reader无背压,首次增量查询因历史数据太多而产生过多分片,增大read.tasks
2. split_reader有背压,增加下流数据处理算子的并行度
 
一般对于历史数据都需要较长的时间来处理,很难通过提高并行度控制在超时前完成
 
**以上两种方案都是使得在第一次成功checkpoint之前处理完所有历史数据**
 
 
- 方案三:缓存所有分片后才触发微批读取
 
每次增量查询产生的最后一个分片被缓存到`StreamReadOperator#splits`后,才投递一个微批消费Mail,这样就可以很快接收到barrier
 
```
org.apache.hudi.source.StreamReadOperator#processElement:
 
  public void processElement(StreamRecord<MergeOnReadInputSplit> element) {
    splits.add(element.getValue());
    if (element.getValue().isLast()) {
      enqueueProcessSplits();
    }
  }
```
 
这种方案相当于首次增量查询的分片不会与数据读取轮流执行,而是缓存了1200个分片后才开始读取第一条数据,但是之后的每次增量查询产生的每个分片被缓存时还会伴随一次微批数据读取,参考while 1~6。所以这种方案适合增量数据不大的情况下使用。
 
- 方案四:控制数据读取速率
 
在每个checkpoint interval之内数据读取到一定的数量后,将不再进行微批读取,任务线程只接收分片和barrier
```
private void enqueueProcessSplits() {
    if (maxConsumeRecordsPerCkp > 0 && consumedRecordsBetweenCkp > maxConsumeRecordsPerCkp)
            return;  //reach max consume records in this checkpoint interval
}
 
private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws IOException {
         consumedRecordsBetweenCkp += 1L;
         split.consume();
}
 
public void snapshotState(StateSnapshotContext context) throws Exception {
        consumedRecordsBetweenCkp = 0L;  // reset when a new checkpoint coming.
}
```
> maxConsumeRecordsPerCkp = 下游最大消费速率 * checkpoint间隔时间
 
这种方案可以使split_reader不会一直阻塞checkpoint,同时下游也不会持续背压。
 
- 方案二:数据读取和栅栏处理分离
 
使用一个独立的线程进行数据读取,任务线程负责分片缓存和barrier接收处理。这种方案是最通用、最理想的,已有[issues](https://github.com/apache/hudi/issues/8087),准备将实现推个PR上去。
0条评论
0 / 1000
矛始
28文章数
0粉丝数
矛始
28 文章 | 0 粉丝
原创

hudi系列-增量查询ckp超时

2023-12-19 02:14:31
7
0
# 环境
 
- hudi 0.11.1
- flink 1.14.5
- MOR表,按天分区,约几百个,每个分区下基本只有一个base文件(最近的分区文件也不多)
- Aligned Checkpoints
- checkpoint timeout 10 分钟
- checkpoint interval 1 分钟
 
# 首次checkpoint超时报错
 
> Checkpoint expired before completing.
 
有一个hudi mor表,我们在它上面做实时应用,但要求初始运行时也须要处理历史的全量数据,因此在使用hudi的[增量查询](https://blog.csdn.net/czmacd/article/details/126059726)时设置了`read.start-commit=earliest`,在运行10分钟后就超时报错。通过查看checkpoint详情,发现split_monitor很快完成了ckp,但是算子**split_reader**一直没有响应,所以是它阻塞了checkpoint.
 
# split_reader为什么不checkpoint
 
通过观察每个算子的输入输出监控信息,split_monitor首次增量查询时产生了约1000+个分片[(文件切片)](https://blog.csdn.net/czmacd/article/details/129049025),实际的表基本就是1000+个很小的parquet文件(几千万数据)。
 
>The operator that reads the splits received from the preceding StreamReadMonitoringFunction. Contrary to the StreamReadMonitoringFunction which has a parallelism of 1, this operator can have multiple parallelism.
As soon as an input split MergeOnReadInputSplit is received, it is put into a queue, the MailboxExecutor read the actual data of the split. This architecture allows the separation of split reading from processing the checkpoint barriers, thus removing any potential back-pressure.
 
按源码`StreamReadOperator`的注释,它只是将分片信息缓存起来(最终保存到flink状态中),再使用`MailboxExecutor`进行微批文件读取(每次2048条记录),将数据的读取和栅栏的处理分离,从而避免背压。
 
但是发现了split_reader接收分片的速度很慢,效果如同边接收分片边读取数据一般,这与设计之衷相悖了。经调试证明了以下三个操作均处于同一任务线程:
 
- StreamReadOperator#snapshotState:处理barrier,快照状态数据
- StreamReadOperator#processElement:接收分片并缓存
- StreamReadOperator#processSplits:读取分片数据,每次只读取2048条
 
正因如此,首个barrier生成是插在1000个分片后面,所以barrier被任务线程接收到时已经过了超时时间(Start Delay很大,说明barrier一直没到)。
 
事实上我的split_reader算子并行度因业务需要只设置了1,就算提高并行度也解决不了问题,因为下游的处理速度实在太慢了,不可能在10分钟之内处理完几千万数据。**说到底,还是因为背压导致barrier不能及时对齐**,但是split_reader的稳定性不应该依赖于背压,如设计所述,**数据读取和栅栏处理应该分离**。
 
# MailBox线程模型
 
在flink 1.10之前,通过全局锁(checkpointLock)来确保任务线程安全,后来搞了个MailBox线程模型,底层是一个FIFO的阻塞队列+单线程处理器,将需要处理的操作封装成Mail,投递到MailBox中,再由`MailBoxProcess`去循环一个个消费处理,这个过程就是`StreamTask`的核心逻辑。
 
> 处理mail的线程就是任务线程
 
```
while (true) {
step 1.处理mailbox中的所有mail(processSplits)
step 2.处理流数据、barrier等
}
```
 
代入到`StreamReadOperator`中,每个Mail就是一次微批数据读取(processSplits),所以任务线程串行地运行着以下操作:
```
while 1:缓存split_1 -> 生成并递投mail_1到邮箱 
while 2:消费mail_1 -> 接收缓存下一个split_2 -> 生成并投递mail_2到邮箱
while 3:消费mail_2 -> 接收缓存下一个split_3 -> 生成并投递mail_3到邮箱
while 4:消费mail_3 -> 接收缓存下一个split_4 -> 生成并投递mail_4到邮箱
while 5:......
while 6:消费mail_1200 -> 处理barrier,调用snapshotState快照(第一次checkpoint)
```
由于有`StreamReadOperator#currentSplitState`维持着状态,所以在前一个mail被消费之前不会有新的mail可以投递成功,如果首次增量查询生成1200个分片的话,每缓存一个分片都伴随着一次微批数据读取,为了避免checkpoint超时,必须在10分钟中完成1200次微批读取,并且下游能及时处理完1200*2048=2457600个数据记录。
 
# 解决方案
 
本质上是不能及时接收到barrier,由于下游背压或自身处理速度不足引起,所以要从这两方面考虑方案。
 
- 方案一:提高容忍checkpoint失败次数
 
```
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
```
默认情况下,checkpoint失败会引起作业重启,通过设置容忍失败次数,即10*100min内checkpoint失败可以忽略。这种方法需要预估最小时间来设置参数2,同时在处理完历史数据后需要重启一下作业将值改小。另外需要对其它算子的影响,特别是对于有状态的算子,或者是有些sink需要依赖checkpoint。
 
- 方案二:提高并行度
 
通过并行度来提高整体作业的吞吐量
1. split_reader无背压,首次增量查询因历史数据太多而产生过多分片,增大read.tasks
2. split_reader有背压,增加下流数据处理算子的并行度
 
一般对于历史数据都需要较长的时间来处理,很难通过提高并行度控制在超时前完成
 
**以上两种方案都是使得在第一次成功checkpoint之前处理完所有历史数据**
 
 
- 方案三:缓存所有分片后才触发微批读取
 
每次增量查询产生的最后一个分片被缓存到`StreamReadOperator#splits`后,才投递一个微批消费Mail,这样就可以很快接收到barrier
 
```
org.apache.hudi.source.StreamReadOperator#processElement:
 
  public void processElement(StreamRecord<MergeOnReadInputSplit> element) {
    splits.add(element.getValue());
    if (element.getValue().isLast()) {
      enqueueProcessSplits();
    }
  }
```
 
这种方案相当于首次增量查询的分片不会与数据读取轮流执行,而是缓存了1200个分片后才开始读取第一条数据,但是之后的每次增量查询产生的每个分片被缓存时还会伴随一次微批数据读取,参考while 1~6。所以这种方案适合增量数据不大的情况下使用。
 
- 方案四:控制数据读取速率
 
在每个checkpoint interval之内数据读取到一定的数量后,将不再进行微批读取,任务线程只接收分片和barrier
```
private void enqueueProcessSplits() {
    if (maxConsumeRecordsPerCkp > 0 && consumedRecordsBetweenCkp > maxConsumeRecordsPerCkp)
            return;  //reach max consume records in this checkpoint interval
}
 
private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws IOException {
         consumedRecordsBetweenCkp += 1L;
         split.consume();
}
 
public void snapshotState(StateSnapshotContext context) throws Exception {
        consumedRecordsBetweenCkp = 0L;  // reset when a new checkpoint coming.
}
```
> maxConsumeRecordsPerCkp = 下游最大消费速率 * checkpoint间隔时间
 
这种方案可以使split_reader不会一直阻塞checkpoint,同时下游也不会持续背压。
 
- 方案二:数据读取和栅栏处理分离
 
使用一个独立的线程进行数据读取,任务线程负责分片缓存和barrier接收处理。这种方案是最通用、最理想的,已有[issues](https://github.com/apache/hudi/issues/8087),准备将实现推个PR上去。
文章来自个人专栏
hudi
17 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0