1. chanelog模式
在以hudi的MOR表作为flink cdc的源时,出现了一些并非预期的效果。hudi自身支持ChangelogModes#FULL & ChangelogModes#UPSERT 两种模式,从它们支持的RowKind来看,还以为数据写入与读取时 RowKind是一致的,其实不然;另外,如果读写分别使用了不同的模式,那又会产生什么后果,我们应该怎么使用它。
- 当changelog.enable=true时,为FULL模式,包含的RowKind有:INSERT , UPDATE_BEFORE , UPDATE_AFTER , DELETE
- 当changelog.enable=false时,为UPSERT模式,包含的RowKind有:INSERT , UPDATE_AFTER , DELETE
2. FULL模式
2.1 写数据
在该模式下,sink端会完整接收上流所有RowKind类型的数据,在hudi内部通过往行记录中增加元数据字段_hoodie_operation
,并将该字段的值设置为RowKind的名称(+I,-U,+U,-D),通过查看生成的rt表和ro表结构,多了一列。org.apache.hudi.avro.HoodieAvroUtils public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOperation operation) { record.put("_hoodie_operation", operation.getName()); return record; }
2.2 读数据
changelog数据从文件系统中读出来时是avro格式(HoodieAvroRecord)的,需要转换为flink内部标准RowData,其中RowKind来自数据写入时增加的字段_hoodie_operation
,如果元数据字段不存在,则默认使用INSERT值。
该模式下的读取过程在org.apache.hudi.table.format.FormatUtils public static void setRowKind(RowData rowData, IndexedRecord record, int index) { if (index != -1) { rowData.setRowKind(getRowKind(record, index)); } }
org.apache.hudi.table.format.mor.MergeOnReadInputFormat#getUnMergedLogFileIterator
中,同主键的changelog有多少都会读出来,不会被合并到最终状态。3. UPSERT模式
3.1 写数据
该模式下,sink端不会接收UPDATE_BEFORE的changelog,而且数据在写入时也不会添加元数据字段_hoodie_operation
。与FULL模式不同,对于DELETE的数据,在保存时并不包含行本身的数据,只有key信息(org.apache.hudi.io.HoodieAppendHandle#recordsToDelete
)。3.2 读数据
该模式下的读过程在org.apache.hudi.table.format.mor.MergeOnReadInputFormat#getLogFileIterator
中,同主键的changelog会被压缩,只返回最终的状态数据(HoodieMergedLogRecordScanner)
读出来需要包装为flink内部格式RowData,RowKind是如何确定的?
- 如果HoodieAvroRecord.getData()为空,表示changelog保存时不包含行本身的数据,所以这是一个-D操作
- 尝试获取
_hoodie_operation
的值,如果存在,则使用该RowKind,否则默认为+I
总的来说,在没有_hoodie_operation的情况下,只有+I和-D两种,否则取_hoodie_operation的值4. 不同模式的读写
上图的avro log指mor表的增量提交日志文件,上面的示例数据被一次性读取
- FULL写保存RowKind信息
- FULL读只能用于开启了FULL写的MOR表,且读出的数据与写入一致
- UPSERT写不包含RowKind信息
- UPSERT读会合并中间状态
- UPSERT读的-D取决于写入模式时是否保存了行数据,FULL写入包含则读取有,UPSERT写入不包含则为null
关键代码:
org.apache.hudi.io.HoodieAppendHandle#getIndexedRecord
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload#getInsertValue