searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Apache Iceberg 写入流程

2024-10-14 09:40:17
4
0

Apache Iceberg 可以认为只支持 Insert 和 Delete,其他的操作均使用 Insert 和 Delete 组合完成,比如 Update = Delete + Insert。

Apache Iceberg 提供了两种 delete 形式:pos delete 和 eq delete,其中

  • pos delete 会记录某个文件某个位置的信息需要被删除
  • eq delete 会记录一个删除条件,后续读取的时候,所有符合该条件的需要被删除

其中 eq delete 仅修饰 sequence 比自己写入时 sequence 小的 data file,这个考虑是为了保证正确性。比如在之前的 sequence number 中有 +I 的数据,然后当前 sequence number 中有 -D+I 的数据,如果 eq delete 可以修饰当前 sequence number 的数据,则可能会得到错误的结果:null。而上述直接过程的正确结果是第二次 +I 的结果。因此在同一个 sequence number 中使用 pos delete 进行修饰。也就是说

type 描述 修饰范围
pos delete 标记某个文件某个位置的数据被删除 可以修饰所有数据
eq delete 标记需要删除的数据条件,读取时所有符合条件的数据都会被删除 仅修饰比当前 delete sequence number 小的 datafile

下面以 Flink 写入为例来描述下整个写入流程。

首先整体的流程大致如下所示

source data == ditrisbute mode ==> [IcebergWriter == partition transformer ==> file ] ==> Committer

其中

  • source data 是 Iceberg Writer 上游已经处理好的数据
  • IcebergWriter 是实际往外部文件写数据的地方,每个 IcebergWriter 是一个线程,维护着多个打开的文件流
  • Committer 是最终将结果提交到 Catalog 的地方

上游数据经过处理后,会经过 distribute mode 发送的 Iceberg Writer 中, Iceberg Writer 会将数据写入到实际的文件中,最终由 Committer 进行表的提交。

distribute mode 觉得数据以什么形式发送到不同的 Iceberg Writer 中,因此会影响实际的文件数量。
partition transformer 定义在表的属性中,决定表如何进行分区,会影响生成的文件数量以及后续的查询性能。

写入按照是否有 update、是否分区分为四种情况:

dataLocation
├── partition1=x
│   └── partition2=y
│       └── String.format(%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCoount.incrementAndGet(),  null != suffix ? "-" + suffix : "");
├── partition1=..
│   └── partition2=..
└── partition1=..
  • 有 update 的情况下
    • 每次可能会有三类文件: DataFile, Eq delete file, Pos delete file.
    • 有分区和无分区区别在文件路径上,其中
      • 有分区的路径如下所示(三种文件的路径一样)
        • String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), fileName);
        • spec.partitionToPath: String.join("/", (partitionFiled = partitionValue))
        • fileName: String.format("%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : "")
      • 无分区的路径如下所示
        • String.format("%s/%s", dataLocation, String.format( "%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : ""));
    • DataFile/Eq delete 的数据直接 append 进去
    • Pos delete 的数据
      • 内存中先按照 path 排序,path 相同的再按照 pos 进行排序,排序完成后写入到文件中
      • 超过阈值后一起 flush 出去
    • 所有的 update 以 equality fields 作为唯一健判断,只能删除同一分区内数据(尤其在 evolution 阶段需要考虑)
  • 无 update 的情况下
    • 每次仅产生 datafile,是否有分区体现在文件路径上
    • 有分区的路径如下所示(RowDataPartitionedFanoutWriter.java)
      • String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), fileName);
      • spec.partitionToPath: String.join("/", (partitionFiled = partitionValue))
      • fileName: String.format("%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : "")
    • 无分区的路径如下所示(UnpartitionedWriter.java)
      • String.format("%s/%s", dataLocation, String.format( "%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : ""));
    • 数据以 append 的方式写入到文件中

其中

  • partitionId 对应 Flink 中的 subTaskIndex
  • taskId 对应的是 task 的 attemptId
  • operationId 对应的是一个 UUID
  • fileCount 是当前已经打开过的文件数目
  • suffix 暂时只在 SparkDelete 文件中有用,值为 "deletes"
有分区的路径如下所示

dataLocation
├── partition1=x
│   └── partition2=y
│       └── String.format(%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCoount.incrementAndGet(),  null != suffix ? "-" + suffix : "");
├── partition1=..
│   └── partition2=..
└── partition1=..
无分区的路径如下所示
dataLocation
└── String.format(%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCoount.incrementAndGet(),  null != suffix ? "-" + suffix : "");

如果有已经打开的文件则复用,否则按照上述逻辑创建文件并进行写入。

对于数据写入来说,可以 DataFile/Eq Delete File/Pos Delete File 的逻辑如下所示

  • 对于 Insert/UpdateAfter 数据直接写入到 DataFile 中,并记录下写入到文件和 position 信息 (key -> <file, offset>)
  • 对于 Update_Before/Delete 需要删除之前的数据
    • 首先查看当前数据是否在本次 sequence number 写入(以 equality fields 进行查找)
      • 如果是则以 pos delete 的形式写出
      • 否则以 eq delete 的形式写出。
    • 将数据从内存记录中删除
  • 当前 sequence number 结束的时候,清空 position 信息释放内存
  • Pos Delete 文件中写入的内容是 <FileName, Pos> 的结构,且按照 FileName 排序,然后按照 Pos 进行排序,最后统一写出。
  • Eq Delete 写入 equlity fields 的所有列。(原始数据做 project 后的数据)

由于 Iceberg 没有类似主键约束,因此无法避免同一条数据插入两次,在 Flink Sink 中通过 upsert-mode 来确保每一行均只有一条数据,这种模式会将 Insert 翻译为 Update = Delete + Insert 来确保表中相同 equality fields 的行只有一条。如果能保证上游不会连续发送两次 Insert,可以关闭 upsert mode 以提升性能。

Flink 中以 checkpoint 为界,当 checkpoint 开始时,本轮 Sequence number 的写入结束,当 Checkpoint 成功后,进行 Commit

partition transformer 现在的支持可以参考[3],另外不是所有引擎都支持所有的 transformer,比如 Flink 对 transformer 的支持有限,部分 transformer 需要通过 Spark 建表,在 Flink 中使用
partition transformer 还需要考虑在后续扩缩容的情况

Other

Flink 中如果 checkpoint 失败了,之前写入的数据咋处理呢?

  • 回滚还是怎么处理呢?

partition transformer 这块是否有什么可以优化(比如读写对分区/并发等需求不一致),这个是否能否进行优化

0条评论
0 / 1000
q****n
3文章数
0粉丝数
q****n
3 文章 | 0 粉丝
q****n
3文章数
0粉丝数
q****n
3 文章 | 0 粉丝
原创

Apache Iceberg 写入流程

2024-10-14 09:40:17
4
0

Apache Iceberg 可以认为只支持 Insert 和 Delete,其他的操作均使用 Insert 和 Delete 组合完成,比如 Update = Delete + Insert。

Apache Iceberg 提供了两种 delete 形式:pos delete 和 eq delete,其中

  • pos delete 会记录某个文件某个位置的信息需要被删除
  • eq delete 会记录一个删除条件,后续读取的时候,所有符合该条件的需要被删除

其中 eq delete 仅修饰 sequence 比自己写入时 sequence 小的 data file,这个考虑是为了保证正确性。比如在之前的 sequence number 中有 +I 的数据,然后当前 sequence number 中有 -D+I 的数据,如果 eq delete 可以修饰当前 sequence number 的数据,则可能会得到错误的结果:null。而上述直接过程的正确结果是第二次 +I 的结果。因此在同一个 sequence number 中使用 pos delete 进行修饰。也就是说

type 描述 修饰范围
pos delete 标记某个文件某个位置的数据被删除 可以修饰所有数据
eq delete 标记需要删除的数据条件,读取时所有符合条件的数据都会被删除 仅修饰比当前 delete sequence number 小的 datafile

下面以 Flink 写入为例来描述下整个写入流程。

首先整体的流程大致如下所示

source data == ditrisbute mode ==> [IcebergWriter == partition transformer ==> file ] ==> Committer

其中

  • source data 是 Iceberg Writer 上游已经处理好的数据
  • IcebergWriter 是实际往外部文件写数据的地方,每个 IcebergWriter 是一个线程,维护着多个打开的文件流
  • Committer 是最终将结果提交到 Catalog 的地方

上游数据经过处理后,会经过 distribute mode 发送的 Iceberg Writer 中, Iceberg Writer 会将数据写入到实际的文件中,最终由 Committer 进行表的提交。

distribute mode 觉得数据以什么形式发送到不同的 Iceberg Writer 中,因此会影响实际的文件数量。
partition transformer 定义在表的属性中,决定表如何进行分区,会影响生成的文件数量以及后续的查询性能。

写入按照是否有 update、是否分区分为四种情况:

dataLocation
├── partition1=x
│   └── partition2=y
│       └── String.format(%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCoount.incrementAndGet(),  null != suffix ? "-" + suffix : "");
├── partition1=..
│   └── partition2=..
└── partition1=..
  • 有 update 的情况下
    • 每次可能会有三类文件: DataFile, Eq delete file, Pos delete file.
    • 有分区和无分区区别在文件路径上,其中
      • 有分区的路径如下所示(三种文件的路径一样)
        • String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), fileName);
        • spec.partitionToPath: String.join("/", (partitionFiled = partitionValue))
        • fileName: String.format("%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : "")
      • 无分区的路径如下所示
        • String.format("%s/%s", dataLocation, String.format( "%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : ""));
    • DataFile/Eq delete 的数据直接 append 进去
    • Pos delete 的数据
      • 内存中先按照 path 排序,path 相同的再按照 pos 进行排序,排序完成后写入到文件中
      • 超过阈值后一起 flush 出去
    • 所有的 update 以 equality fields 作为唯一健判断,只能删除同一分区内数据(尤其在 evolution 阶段需要考虑)
  • 无 update 的情况下
    • 每次仅产生 datafile,是否有分区体现在文件路径上
    • 有分区的路径如下所示(RowDataPartitionedFanoutWriter.java)
      • String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), fileName);
      • spec.partitionToPath: String.join("/", (partitionFiled = partitionValue))
      • fileName: String.format("%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : "")
    • 无分区的路径如下所示(UnpartitionedWriter.java)
      • String.format("%s/%s", dataLocation, String.format( "%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : ""));
    • 数据以 append 的方式写入到文件中

其中

  • partitionId 对应 Flink 中的 subTaskIndex
  • taskId 对应的是 task 的 attemptId
  • operationId 对应的是一个 UUID
  • fileCount 是当前已经打开过的文件数目
  • suffix 暂时只在 SparkDelete 文件中有用,值为 "deletes"
有分区的路径如下所示

dataLocation
├── partition1=x
│   └── partition2=y
│       └── String.format(%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCoount.incrementAndGet(),  null != suffix ? "-" + suffix : "");
├── partition1=..
│   └── partition2=..
└── partition1=..
无分区的路径如下所示
dataLocation
└── String.format(%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCoount.incrementAndGet(),  null != suffix ? "-" + suffix : "");

如果有已经打开的文件则复用,否则按照上述逻辑创建文件并进行写入。

对于数据写入来说,可以 DataFile/Eq Delete File/Pos Delete File 的逻辑如下所示

  • 对于 Insert/UpdateAfter 数据直接写入到 DataFile 中,并记录下写入到文件和 position 信息 (key -> <file, offset>)
  • 对于 Update_Before/Delete 需要删除之前的数据
    • 首先查看当前数据是否在本次 sequence number 写入(以 equality fields 进行查找)
      • 如果是则以 pos delete 的形式写出
      • 否则以 eq delete 的形式写出。
    • 将数据从内存记录中删除
  • 当前 sequence number 结束的时候,清空 position 信息释放内存
  • Pos Delete 文件中写入的内容是 <FileName, Pos> 的结构,且按照 FileName 排序,然后按照 Pos 进行排序,最后统一写出。
  • Eq Delete 写入 equlity fields 的所有列。(原始数据做 project 后的数据)

由于 Iceberg 没有类似主键约束,因此无法避免同一条数据插入两次,在 Flink Sink 中通过 upsert-mode 来确保每一行均只有一条数据,这种模式会将 Insert 翻译为 Update = Delete + Insert 来确保表中相同 equality fields 的行只有一条。如果能保证上游不会连续发送两次 Insert,可以关闭 upsert mode 以提升性能。

Flink 中以 checkpoint 为界,当 checkpoint 开始时,本轮 Sequence number 的写入结束,当 Checkpoint 成功后,进行 Commit

partition transformer 现在的支持可以参考[3],另外不是所有引擎都支持所有的 transformer,比如 Flink 对 transformer 的支持有限,部分 transformer 需要通过 Spark 建表,在 Flink 中使用
partition transformer 还需要考虑在后续扩缩容的情况

Other

Flink 中如果 checkpoint 失败了,之前写入的数据咋处理呢?

  • 回滚还是怎么处理呢?

partition transformer 这块是否有什么可以优化(比如读写对分区/并发等需求不一致),这个是否能否进行优化

文章来自个人专栏
回到过去-展望未来--大数据
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0