searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-changelog的读写

2022-12-11 08:22:54
288
0

1. chanelog模式

在以hudi的MOR表作为flink cdc的源时,出现了一些并非预期的效果。hudi自身支持ChangelogModes#FULL & ChangelogModes#UPSERT 两种模式,从它们支持的RowKind来看,还以为数据写入与读取时 RowKind是一致的,其实不然;另外,如果读写分别使用了不同的模式,那又会产生什么后果,我们应该怎么使用它。

  • 当changelog.enable=true时,为FULL模式,包含的RowKind有:INSERT , UPDATE_BEFORE , UPDATE_AFTER , DELETE
  • 当changelog.enable=false时,为UPSERT模式,包含的RowKind有:INSERT , UPDATE_AFTER , DELETE

    2. FULL模式

    2.1 写数据

    在该模式下,sink端会完整接收上流所有RowKind类型的数据,在hudi内部通过往行记录中增加元数据字段_hoodie_operation,并将该字段的值设置为RowKind的名称(+I,-U,+U,-D),通过查看生成的rt表和ro表结构,多了一列。
    org.apache.hudi.avro.HoodieAvroUtils
    public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOperation operation) {
        record.put("_hoodie_operation", operation.getName());
        return record;
    }
    

    2.2 读数据

    changelog数据从文件系统中读出来时是avro格式(HoodieAvroRecord)的,需要转换为flink内部标准RowData,其中RowKind来自数据写入时增加的字段_hoodie_operation,如果元数据字段不存在,则默认使用INSERT值。
    org.apache.hudi.table.format.FormatUtils
    public static void setRowKind(RowData rowData, IndexedRecord record, int index) {
    if (index != -1) {
        rowData.setRowKind(getRowKind(record, index));
    }
    }
    
    该模式下的读取过程在org.apache.hudi.table.format.mor.MergeOnReadInputFormat#getUnMergedLogFileIterator中,同主键的changelog有多少都会读出来,不会被合并到最终状态。

    3. UPSERT模式

    3.1 写数据

    该模式下,sink端不会接收UPDATE_BEFORE的changelog,而且数据在写入时也不会添加元数据字段_hoodie_operation。与FULL模式不同,对于DELETE的数据,在保存时并不包含行本身的数据,只有key信息(org.apache.hudi.io.HoodieAppendHandle#recordsToDelete)。

    3.2 读数据

    该模式下的读过程在org.apache.hudi.table.format.mor.MergeOnReadInputFormat#getLogFileIterator中,同主键的changelog会被压缩,只返回最终的状态数据(HoodieMergedLogRecordScanner)
    读出来需要包装为flink内部格式RowData,RowKind是如何确定的?
  1. 如果HoodieAvroRecord.getData()为空,表示changelog保存时不包含行本身的数据,所以这是一个-D操作
  2. 尝试获取_hoodie_operation的值,如果存在,则使用该RowKind,否则默认为+I
    总的来说,在没有_hoodie_operation的情况下,只有+I和-D两种,否则取_hoodie_operation的值

    4. 不同模式的读写

    在这里插入图片描述

上图的avro log指mor表的增量提交日志文件,上面的示例数据被一次性读取

  1. FULL写保存RowKind信息
  2. FULL读只能用于开启了FULL写的MOR表,且读出的数据与写入一致
  3. UPSERT写不包含RowKind信息
  4. UPSERT读会合并中间状态
  5. UPSERT读的-D取决于写入模式时是否保存了行数据,FULL写入包含则读取有,UPSERT写入不包含则为null

关键代码:

org.apache.hudi.io.HoodieAppendHandle#getIndexedRecord
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload#getInsertValue
0条评论
0 / 1000
矛始
28文章数
0粉丝数
矛始
28 文章 | 0 粉丝
原创

hudi系列-changelog的读写

2022-12-11 08:22:54
288
0

1. chanelog模式

在以hudi的MOR表作为flink cdc的源时,出现了一些并非预期的效果。hudi自身支持ChangelogModes#FULL & ChangelogModes#UPSERT 两种模式,从它们支持的RowKind来看,还以为数据写入与读取时 RowKind是一致的,其实不然;另外,如果读写分别使用了不同的模式,那又会产生什么后果,我们应该怎么使用它。

  • 当changelog.enable=true时,为FULL模式,包含的RowKind有:INSERT , UPDATE_BEFORE , UPDATE_AFTER , DELETE
  • 当changelog.enable=false时,为UPSERT模式,包含的RowKind有:INSERT , UPDATE_AFTER , DELETE

    2. FULL模式

    2.1 写数据

    在该模式下,sink端会完整接收上流所有RowKind类型的数据,在hudi内部通过往行记录中增加元数据字段_hoodie_operation,并将该字段的值设置为RowKind的名称(+I,-U,+U,-D),通过查看生成的rt表和ro表结构,多了一列。
    org.apache.hudi.avro.HoodieAvroUtils
    public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOperation operation) {
        record.put("_hoodie_operation", operation.getName());
        return record;
    }
    

    2.2 读数据

    changelog数据从文件系统中读出来时是avro格式(HoodieAvroRecord)的,需要转换为flink内部标准RowData,其中RowKind来自数据写入时增加的字段_hoodie_operation,如果元数据字段不存在,则默认使用INSERT值。
    org.apache.hudi.table.format.FormatUtils
    public static void setRowKind(RowData rowData, IndexedRecord record, int index) {
    if (index != -1) {
        rowData.setRowKind(getRowKind(record, index));
    }
    }
    
    该模式下的读取过程在org.apache.hudi.table.format.mor.MergeOnReadInputFormat#getUnMergedLogFileIterator中,同主键的changelog有多少都会读出来,不会被合并到最终状态。

    3. UPSERT模式

    3.1 写数据

    该模式下,sink端不会接收UPDATE_BEFORE的changelog,而且数据在写入时也不会添加元数据字段_hoodie_operation。与FULL模式不同,对于DELETE的数据,在保存时并不包含行本身的数据,只有key信息(org.apache.hudi.io.HoodieAppendHandle#recordsToDelete)。

    3.2 读数据

    该模式下的读过程在org.apache.hudi.table.format.mor.MergeOnReadInputFormat#getLogFileIterator中,同主键的changelog会被压缩,只返回最终的状态数据(HoodieMergedLogRecordScanner)
    读出来需要包装为flink内部格式RowData,RowKind是如何确定的?
  1. 如果HoodieAvroRecord.getData()为空,表示changelog保存时不包含行本身的数据,所以这是一个-D操作
  2. 尝试获取_hoodie_operation的值,如果存在,则使用该RowKind,否则默认为+I
    总的来说,在没有_hoodie_operation的情况下,只有+I和-D两种,否则取_hoodie_operation的值

    4. 不同模式的读写

    在这里插入图片描述

上图的avro log指mor表的增量提交日志文件,上面的示例数据被一次性读取

  1. FULL写保存RowKind信息
  2. FULL读只能用于开启了FULL写的MOR表,且读出的数据与写入一致
  3. UPSERT写不包含RowKind信息
  4. UPSERT读会合并中间状态
  5. UPSERT读的-D取决于写入模式时是否保存了行数据,FULL写入包含则读取有,UPSERT写入不包含则为null

关键代码:

org.apache.hudi.io.HoodieAppendHandle#getIndexedRecord
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload#getInsertValue
文章来自个人专栏
hudi
17 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0