一、功能实现
1、整体功能介绍
不会影响现有的数据读写流程,在数据通过INSERT\BULK等方式写入后整理写入的文件,可以即时、也可以制定定时策略进行数据整理,将小文件合并成较大文件,同时会根据指定的列进行数据排序后再存储,这样让数据更加聚合,增加读取性能。
2、COW Table实现流程
(1)基于TimeLine在t5时刻,表中的分区具有5个文件组f0、f1、f2、f3、f4。假设每个文件组都是100MB。所以分区中的总数据是500MB。
(2)在t6时刻请求聚类操作。与compaction的流程,使用“ClusteringPlan”在元数据中创建一个“t6.clustering.requested”文件,该文件包括所有分区中集群操作所涉及的所有文件组。
示例内容:{ partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”] }
(3)假设集群后的最大文件大小配置为250MB。集群将把分区中的所有数据重新分配到两个文件组中:c1,c2。这些文件组暂时是“幻影”,在t8集群完成之前查询不可见。另外,一个文件组中的记录可以拆分为多个文件组。在本例中,f4文件组中的一些记录同时进入两个新的文件组c1、c2。
(4)当集群正在进行时(t6到t8),任何涉及相关文件组的写入操作都是被拒绝的。
(5)在写入新的数据文件c1-t6.parquet和c2-t6.parque之后,如果配置了全局索引,我们将在记录级别索引中为具有新位置的所有键添加条目。新的索引项对于其他写入将不可见,因为还没有关联的提交。
(6)最后,我们创建一个提交元数据文件“t6.commit”,其中包括通过该提交修改的文件组(f0、f1、f2、f3、f4)。
(7)文件组(f0到f4)不会立即从磁盘中删除。Cleaner会在存档t6.commit之前清理这些文件及更新了所有视图,以忽略所有提交元数据文件中提到的所有文件组。所以不会看到重复的数据。
2、MOR Table实现流程
整个流程和COW table类似,对于MOR table, 数据写入会影响了parquet数据文件和日志文件. Clustering影响的是parquet数据文件。
二、制定Clustering计划
1、确定符合Clustering条件的文件
分区过滤出指定的分区
根据targetFileSize 排除不需要Clustering的文件组
排除已经计划在clustering或者compact的文件组
排除包含日志文件的文件组
2、对文件进行重新分组
根据特定条件对符合群集条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数根据记录关键字范围对文件进行分组。键值可以被存储在parquet文件的footer中,利于一些特定的查询与更新。
分组都是基于统一的commit time
排序基于的规则可以指定key的线性排序,或者是选择多维度更好的算法如z-order与hilbert
3、保存计划
clustering 计划被保存在TimeLine上,元数据结构
hudi/hudi-common/src/main/avro/HoodieClusteringPlan.avsc at master · apache/hudi · GitHub
三、执行计划案例
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "/tmp/hudi_trips_cow"
val dataGen = new DataGenerator(Array("2020/03/11"))
val updates = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.parquet.small.file.limit", "0").
option("hoodie.clustering.inline", "true").
option("hoodie.clustering.inline.max.commits", "4").
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data
mode(Append).
save(basePath);