hudi支持多种数据写入方式:insert、bulk_insert、upsert、boostrap,我们可以根据数据本身属性(append-only或upsert)来选择insert和upsert方式,同时也支持对历史数据的高效同步并嫁接到实时流程。
这里的使用技术组合为flink + hudi-0.11
upsert
这是hudi默认的写入方式,是包含了INSERT和UPDATE两种操作,如何区分两种操作?在数据写入之前会进行一个”tag”过程,即通过查找索引来确定记录的位置,如果是UPDATE操作,那么我们会得到记录的旧位置,否则将会为INSERT的记录分配一个新位置,”相同”的记录被组织在一起,还能进行小文件方面的优化。这种写入方式适合数据会更新(不会重复)而且需要保留变更数据的场景(Changelog Mode),结合flink进行近实时流式计算。
insert
单纯的插入操作,由于不需要判断记录是否属于更新,因此省略了”tag”过程,速度会比upsert快得多,但是不能保证数据是去重的,对于append-only的数据(日志、行为)很适合使用这种方式
- MOR表:采用的小文件优化策略与upsert一样,就是少维护了基于flink状态的全局索引
- COW表:每次写都会直接生成新的parquet文件,写过程并不会进行小文件优化,但可以通过clustering进行来重新调整。
bulk_insert
将外部系统的历史快照数据快速导入到hudi中,直接生成parquet文件。在语义上与insert类似,不会对数据进行去重,也不包含”tag”过程。默认情况下,在对数据进行写入之前会进行两个比较重要的操作: - 按分区shuffle:并行数与write.tasks一致,保证了一个分区的数据只被同一个task处理,可以减少每个分区下文件数量
- 排序:并行数与write.tasks一致,数据缓存在算子
SortOperator
中按分区排序,直到endInput
时把数据顺序往下发,writer_task遇到不同的分区数据时就会滚动创建新的file handler,所以按分区排序也减少了数据文件的数量 。底层基于BinaryExternalSorter
直接对二进制数据进行排序,且在内在不足时会将数据溢写到磁盘。
综合来说bulk_insert适合一次性加载大批量的快照数据,而且能一定程度上优化了文件的大小和数量 。| (p1,p4) | ===\ /=== |SortOperator| === | writer_task1 -> (p1, p2) shuffle | (p3,p2) | ===/ \=== |SortOperator| === | writer_task2 -> (p3, p4)
bootstrap
引导将Hudi表的数据重建索引,bulk_insert加载到hudi的历史数据不会生成索引,在进行对增量数据upsert前进行一次boostrap操作,才能实现全量数据的去重。 - 应该只触发一次,所以在job从状态中重启时需要将
index.bootstrap.enable
设置回fase - bootstrap过程会阻塞checkpoint,请合理调整
execution.checkpointing.tolerable-failed-checkpoints