Apache Iceberg 可以认为只支持 Insert 和 Delete,其他的操作均使用 Insert 和 Delete 组合完成,比如 Update = Delete + Insert。
Apache Iceberg 提供了两种 delete 形式:pos delete 和 eq delete,其中
- pos delete 会记录某个文件某个位置的信息需要被删除
- eq delete 会记录一个删除条件,后续读取的时候,所有符合该条件的需要被删除
其中 eq delete 仅修饰 sequence 比自己写入时 sequence 小的 data file,这个考虑是为了保证正确性。比如在之前的 sequence number 中有 +I
的数据,然后当前 sequence number 中有 -D
和 +I
的数据,如果 eq delete 可以修饰当前 sequence number 的数据,则可能会得到错误的结果:null。而上述直接过程的正确结果是第二次 +I
的结果。因此在同一个 sequence number 中使用 pos delete 进行修饰。也就是说
type | 描述 | 修饰范围 |
---|---|---|
pos delete | 标记某个文件某个位置的数据被删除 | 可以修饰所有数据 |
eq delete | 标记需要删除的数据条件,读取时所有符合条件的数据都会被删除 | 仅修饰比当前 delete sequence number 小的 datafile |
下面以 Flink 写入为例来描述下整个写入流程。
首先整体的流程大致如下所示
source data == ditrisbute mode ==> [IcebergWriter == partition transformer ==> file ] ==> Committer
其中
- source data 是 Iceberg Writer 上游已经处理好的数据
- IcebergWriter 是实际往外部文件写数据的地方,每个 IcebergWriter 是一个线程,维护着多个打开的文件流
- Committer 是最终将结果提交到 Catalog 的地方
上游数据经过处理后,会经过 distribute mode 发送的 Iceberg Writer 中, Iceberg Writer 会将数据写入到实际的文件中,最终由 Committer 进行表的提交。
distribute mode 觉得数据以什么形式发送到不同的 Iceberg Writer 中,因此会影响实际的文件数量。
partition transformer 定义在表的属性中,决定表如何进行分区,会影响生成的文件数量以及后续的查询性能。
写入按照是否有 update、是否分区分为四种情况:
dataLocation
├── partition1=x
│ └── partition2=y
│ └── String.format(%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCoount.incrementAndGet(), null != suffix ? "-" + suffix : "");
├── partition1=..
│ └── partition2=..
└── partition1=..
- 有 update 的情况下
- 每次可能会有三类文件: DataFile, Eq delete file, Pos delete file.
- 有分区和无分区区别在文件路径上,其中
- 有分区的路径如下所示(三种文件的路径一样)
- String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), fileName);
- spec.partitionToPath: String.join("/", (partitionFiled = partitionValue))
- fileName: String.format("%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : "")
- 无分区的路径如下所示
- String.format("%s/%s", dataLocation, String.format( "%05d-%d-%s-%05d%s", partitionId, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : ""));
- 有分区的路径如下所示(三种文件的路径一样)
- DataFile/Eq delete 的数据直接 append 进去
- Pos delete 的数据
- 内存中先按照 path 排序,path 相同的再按照 pos 进行排序,排序完成后写入到文件中
- 超过阈值后一起 flush 出去
- 所有的 update 以 equality fields 作为唯一健判断,只能删除同一分区内数据(尤其在 evolution 阶段需要考虑)
- 无 update 的情况下
- 每次仅产生 datafile,是否有分区体现在文件路径上
- 有分区的路径如下所示(RowDataPartitionedFanoutWriter.java)
- String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), fileName);
- spec.partitionToPath: String.join("/", (partitionFiled = partitionValue))
- fileName: String.format("%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : "")
- 无分区的路径如下所示(UnpartitionedWriter.java)
- String.format("%s/%s", dataLocation, String.format( "%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCount.incrementAndGet(), null != suffix ? "-" + suffix : ""));
- 数据以 append 的方式写入到文件中
其中
- partitionId 对应 Flink 中的 subTaskIndex
- taskId 对应的是 task 的 attemptId
- operationId 对应的是一个 UUID
- fileCount 是当前已经打开过的文件数目
- suffix 暂时只在 SparkDelete 文件中有用,值为 "deletes"
有分区的路径如下所示
dataLocation
├── partition1=x
│ └── partition2=y
│ └── String.format(%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCoount.incrementAndGet(), null != suffix ? "-" + suffix : "");
├── partition1=..
│ └── partition2=..
└── partition1=..
无分区的路径如下所示
dataLocation
└── String.format(%05d-%d-%s-%05d%s", subTaskIndex, taskId, operationId, fileCoount.incrementAndGet(), null != suffix ? "-" + suffix : "");
如果有已经打开的文件则复用,否则按照上述逻辑创建文件并进行写入。
对于数据写入来说,可以 DataFile/Eq Delete File/Pos Delete File 的逻辑如下所示
- 对于 Insert/UpdateAfter 数据直接写入到 DataFile 中,并记录下写入到文件和 position 信息 (key -> <file, offset>)
- 对于 Update_Before/Delete 需要删除之前的数据
- 首先查看当前数据是否在本次 sequence number 写入(以 equality fields 进行查找)
- 如果是则以 pos delete 的形式写出
- 否则以 eq delete 的形式写出。
- 将数据从内存记录中删除
- 首先查看当前数据是否在本次 sequence number 写入(以 equality fields 进行查找)
- 当前 sequence number 结束的时候,清空 position 信息释放内存
- Pos Delete 文件中写入的内容是 <FileName, Pos> 的结构,且按照 FileName 排序,然后按照 Pos 进行排序,最后统一写出。
- Eq Delete 写入 equlity fields 的所有列。(原始数据做 project 后的数据)
由于 Iceberg 没有类似主键约束,因此无法避免同一条数据插入两次,在 Flink Sink 中通过 upsert-mode 来确保每一行均只有一条数据,这种模式会将 Insert 翻译为 Update = Delete + Insert 来确保表中相同 equality fields 的行只有一条。如果能保证上游不会连续发送两次 Insert,可以关闭 upsert mode 以提升性能。
Flink 中以 checkpoint 为界,当 checkpoint 开始时,本轮 Sequence number 的写入结束,当 Checkpoint 成功后,进行 Commit
partition transformer 现在的支持可以参考[3],另外不是所有引擎都支持所有的 transformer,比如 Flink 对 transformer 的支持有限,部分 transformer 需要通过 Spark 建表,在 Flink 中使用
partition transformer 还需要考虑在后续扩缩容的情况
Other
Flink 中如果 checkpoint 失败了,之前写入的数据咋处理呢?
- 回滚还是怎么处理呢?
partition transformer 这块是否有什么可以优化(比如读写对分区/并发等需求不一致),这个是否能否进行优化