searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-数据写入方式及使用场景

2023-06-16 09:20:45
155
0

hudi支持多种数据写入方式:insert、bulk_insert、upsert、boostrap,我们可以根据数据本身属性(append-only或upsert)来选择insert和upsert方式,同时也支持对历史数据的高效同步并嫁接到实时流程。
这里的使用技术组合为flink + hudi-0.11

upsert

这是hudi默认的写入方式,是包含了INSERT和UPDATE两种操作,如何区分两种操作?在数据写入之前会进行一个”tag”过程,即通过查找索引来确定记录的位置,如果是UPDATE操作,那么我们会得到记录的旧位置,否则将会为INSERT的记录分配一个新位置,”相同”的记录被组织在一起,还能进行小文件方面的优化。这种写入方式适合数据会更新(不会重复)而且需要保留变更数据的场景(Changelog Mode),结合flink进行近实时流式计算。

insert

单纯的插入操作,由于不需要判断记录是否属于更新,因此省略了”tag”过程,速度会比upsert快得多,但是不能保证数据是去重的,对于append-only的数据(日志、行为)很适合使用这种方式

  • MOR表:采用的小文件优化策略与upsert一样,就是少维护了基于flink状态的全局索引
  • COW表:每次写都会直接生成新的parquet文件,写过程并不会进行小文件优化,但可以通过clustering进行来重新调整。

    bulk_insert

    将外部系统的历史快照数据快速导入到hudi中,直接生成parquet文件。在语义上与insert类似,不会对数据进行去重,也不包含”tag”过程。默认情况下,在对数据进行写入之前会进行两个比较重要的操作:
  • 按分区shuffle:并行数与write.tasks一致,保证了一个分区的数据只被同一个task处理,可以减少每个分区下文件数量
  • 排序:并行数与write.tasks一致,数据缓存在算子SortOperator中按分区排序,直到endInput时把数据顺序往下发,writer_task遇到不同的分区数据时就会滚动创建新的file handler,所以按分区排序也减少了数据文件的数量 。底层基于BinaryExternalSorter直接对二进制数据进行排序,且在内在不足时会将数据溢写到磁盘。
    综合来说bulk_insert适合一次性加载大批量的快照数据,而且能一定程度上优化了文件的大小和数量 。
    | (p1,p4) | ===\     /=== |SortOperator| === | writer_task1 -> (p1, p2)
                 shuffle
    | (p3,p2) | ===/     \=== |SortOperator| === | writer_task2 -> (p3, p4)
    

    bootstrap

    引导将Hudi表的数据重建索引,bulk_insert加载到hudi的历史数据不会生成索引,在进行对增量数据upsert前进行一次boostrap操作,才能实现全量数据的去重。
  • 应该只触发一次,所以在job从状态中重启时需要将index.bootstrap.enable设置回fase
  • bootstrap过程会阻塞checkpoint,请合理调整execution.checkpointing.tolerable-failed-checkpoints
0条评论
0 / 1000
矛始
28文章数
0粉丝数
矛始
28 文章 | 0 粉丝
原创

hudi系列-数据写入方式及使用场景

2023-06-16 09:20:45
155
0

hudi支持多种数据写入方式:insert、bulk_insert、upsert、boostrap,我们可以根据数据本身属性(append-only或upsert)来选择insert和upsert方式,同时也支持对历史数据的高效同步并嫁接到实时流程。
这里的使用技术组合为flink + hudi-0.11

upsert

这是hudi默认的写入方式,是包含了INSERT和UPDATE两种操作,如何区分两种操作?在数据写入之前会进行一个”tag”过程,即通过查找索引来确定记录的位置,如果是UPDATE操作,那么我们会得到记录的旧位置,否则将会为INSERT的记录分配一个新位置,”相同”的记录被组织在一起,还能进行小文件方面的优化。这种写入方式适合数据会更新(不会重复)而且需要保留变更数据的场景(Changelog Mode),结合flink进行近实时流式计算。

insert

单纯的插入操作,由于不需要判断记录是否属于更新,因此省略了”tag”过程,速度会比upsert快得多,但是不能保证数据是去重的,对于append-only的数据(日志、行为)很适合使用这种方式

  • MOR表:采用的小文件优化策略与upsert一样,就是少维护了基于flink状态的全局索引
  • COW表:每次写都会直接生成新的parquet文件,写过程并不会进行小文件优化,但可以通过clustering进行来重新调整。

    bulk_insert

    将外部系统的历史快照数据快速导入到hudi中,直接生成parquet文件。在语义上与insert类似,不会对数据进行去重,也不包含”tag”过程。默认情况下,在对数据进行写入之前会进行两个比较重要的操作:
  • 按分区shuffle:并行数与write.tasks一致,保证了一个分区的数据只被同一个task处理,可以减少每个分区下文件数量
  • 排序:并行数与write.tasks一致,数据缓存在算子SortOperator中按分区排序,直到endInput时把数据顺序往下发,writer_task遇到不同的分区数据时就会滚动创建新的file handler,所以按分区排序也减少了数据文件的数量 。底层基于BinaryExternalSorter直接对二进制数据进行排序,且在内在不足时会将数据溢写到磁盘。
    综合来说bulk_insert适合一次性加载大批量的快照数据,而且能一定程度上优化了文件的大小和数量 。
    | (p1,p4) | ===\     /=== |SortOperator| === | writer_task1 -> (p1, p2)
                 shuffle
    | (p3,p2) | ===/     \=== |SortOperator| === | writer_task2 -> (p3, p4)
    

    bootstrap

    引导将Hudi表的数据重建索引,bulk_insert加载到hudi的历史数据不会生成索引,在进行对增量数据upsert前进行一次boostrap操作,才能实现全量数据的去重。
  • 应该只触发一次,所以在job从状态中重启时需要将index.bootstrap.enable设置回fase
  • bootstrap过程会阻塞checkpoint,请合理调整execution.checkpointing.tolerable-failed-checkpoints
文章来自个人专栏
hudi
17 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0