searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-append写过程

2024-05-07 02:18:29
21
0

前言


Append模式每次都生成新的parquet文件,不涉及数据修改、去重。cow+insert一直是append模式,mor+insert在0.13.1后也统一走append写流程(HUDI-6045)

在0.13.1之前,mor+insert存在着写parquet和写log两种情况:

  1. 写parquet:compaction.schedule.enabled = false & clustering.async.enabled = true,这时是append模式
  2. 写log:compaction.schedule.enabled = true & clustering.async.enabled = false,这时走upsert写过程

append写过程比upsert简单直接得多,因此写入性能远优于upsert.

整体流程



  1. 与upsert写过程一样,借助StreamWriteOperatorCoordinator作为写协作器,协调各个write task,负责初始化和推进instant、调度compaction和clustering、同步hive
  2. checkpoint或数据大小到达阀值会触发刷盘,并滚动创建新的文件。
  3. 重用了bulk_insert的核心类BulkInsertWriterHelper,insert和bulk_insert高度相似。

 

AppendWriteFunction

  1. 以Pipelines#append为入口,算子名称为hoodie_append_write,写并行度配置保持一致write.tasks,只不过相对于旧版本去掉了默认值4所以建议显式指定该配置值,否则将跟随flink环境默认并行度。
  2. 所有接收到的数据交由BulkInsertWriterHelper处理
  3. 每次checkpoint时会进行flushData,同时将BulkInsertWriterHelper重置,即将会生成新的fileId的数据文件。

BulkInsertWriterHelper

一个write task对应一个BulkInsertWriterHelper

  1. 每个BulkInsertWriterHelper对象有唯一fileId,checkpoint会生成新的
  2. 不同分区的数据由不同的HoodieRowDataCreateHandle处理
  3. 在一个ckeckpoint间隔内,如果HoodieRowDataCreateHandle处理的数据量 > maxFileSize时,滚动创建新parquet文件,但是fileId相同。

maxFileSize = hoodie.parquet.max.file.size + hoodie.parquet.max.file.size * hoodie.parquet.compression.ratio

调用栈

write:86, ParquetRowDataWriter  
write:62, RowDataParquetWriteSupport  
write:36, RowDataParquetWriteSupport  
write:138, InternalParquetRecordWriter  
write:310, ParquetWriter 
writeRow:65, HoodieRowDataParquetWriter  
write:131, HoodieRowDataCreateHandle  
write:113, BulkInsertWriterHelper 
processElement:86, AppendWriteFunction  

追加模式持续创建新的文件,特别是数据吞吐量不大的时候,将会产生大量小文件,无论对查询还是文件系统都会增加很大压力,所以需要配合clustering服务、clean服务来进行优化。

0条评论
0 / 1000
矛始
28文章数
0粉丝数
矛始
28 文章 | 0 粉丝
原创

hudi系列-append写过程

2024-05-07 02:18:29
21
0

前言


Append模式每次都生成新的parquet文件,不涉及数据修改、去重。cow+insert一直是append模式,mor+insert在0.13.1后也统一走append写流程(HUDI-6045)

在0.13.1之前,mor+insert存在着写parquet和写log两种情况:

  1. 写parquet:compaction.schedule.enabled = false & clustering.async.enabled = true,这时是append模式
  2. 写log:compaction.schedule.enabled = true & clustering.async.enabled = false,这时走upsert写过程

append写过程比upsert简单直接得多,因此写入性能远优于upsert.

整体流程



  1. 与upsert写过程一样,借助StreamWriteOperatorCoordinator作为写协作器,协调各个write task,负责初始化和推进instant、调度compaction和clustering、同步hive
  2. checkpoint或数据大小到达阀值会触发刷盘,并滚动创建新的文件。
  3. 重用了bulk_insert的核心类BulkInsertWriterHelper,insert和bulk_insert高度相似。

 

AppendWriteFunction

  1. 以Pipelines#append为入口,算子名称为hoodie_append_write,写并行度配置保持一致write.tasks,只不过相对于旧版本去掉了默认值4所以建议显式指定该配置值,否则将跟随flink环境默认并行度。
  2. 所有接收到的数据交由BulkInsertWriterHelper处理
  3. 每次checkpoint时会进行flushData,同时将BulkInsertWriterHelper重置,即将会生成新的fileId的数据文件。

BulkInsertWriterHelper

一个write task对应一个BulkInsertWriterHelper

  1. 每个BulkInsertWriterHelper对象有唯一fileId,checkpoint会生成新的
  2. 不同分区的数据由不同的HoodieRowDataCreateHandle处理
  3. 在一个ckeckpoint间隔内,如果HoodieRowDataCreateHandle处理的数据量 > maxFileSize时,滚动创建新parquet文件,但是fileId相同。

maxFileSize = hoodie.parquet.max.file.size + hoodie.parquet.max.file.size * hoodie.parquet.compression.ratio

调用栈

write:86, ParquetRowDataWriter  
write:62, RowDataParquetWriteSupport  
write:36, RowDataParquetWriteSupport  
write:138, InternalParquetRecordWriter  
write:310, ParquetWriter 
writeRow:65, HoodieRowDataParquetWriter  
write:131, HoodieRowDataCreateHandle  
write:113, BulkInsertWriterHelper 
processElement:86, AppendWriteFunction  

追加模式持续创建新的文件,特别是数据吞吐量不大的时候,将会产生大量小文件,无论对查询还是文件系统都会增加很大压力,所以需要配合clustering服务、clean服务来进行优化。

文章来自个人专栏
hudi
17 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0