1. 简介
hudi会不断生成commit、deltacommit、clean等类型的Instant从而形成活跃时间轴(ActiveTimeline),随着时间增长,时间轴变长,.hoodie元数据目录下的文件不断累积,为了限制元数据文件数量,需要对一些比较久远的元数据文件进行归档,保存到.hoodie/archived目录下,可以称之为归档时间轴(ArchivedTimeline)。
1.1 环境
- flink 1.13.6
- hudi 0.11.0
- merge on read表
1.2 逻辑结构
2. 归档流程
2.1 每次commit触发归档
每次数据提交结束后都会触发archive操作HoodieFlinkWriteClient#postCommit
,前提是开启了自动归档配置hoodie.archive.automatic
protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArchival) {
if (!config.isAutoArchive()) return;
if (config.isAsyncArchive()) AsyncArchiveService.waitForCompletion(asyncArchiveService);
else archive(table, acquireLockForArchival);
}
protected void archive(HoodieTable table, boolean acquireLockForArchival) {
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiver.archiveIfRequired(context, acquireLockForArchival);
}
如果是启用了异步归档hoodie.archive.async,则在异步线程中执行归档并等待结束;BaseHoodieWriteClient#archive
创建归档器(HoodieTimelineArchiver)并执行。
2.2 获取归档Instant
需要归档的目标Instant包含6种类型:clean,rollback,commit,deltacommit,replacecommit,compation,其中每种类型有三种状态(对应3个元数据文件),HoodieTimelineArchiver#getInstantsToArchive
中分别获取clean和commit两种场景的Instant,进行合并
private Stream<HoodieInstant> getInstantsToArchive() {
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());//获取所有需要归档的Instant,状态为completed
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
//当前整个活跃时间轴下所有Instant: <<时间戳,Instant类型>,对应的Instant集合>,注意compation类型已经被映射为commit类型,因为compation完成时最终状态为commit
Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants().collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),HoodieInstant.getComparableAction(i.getAction()))));
return instants.flatMap(hoodieInstant -> {
//为completed状态的Instant关联回其中间状态(requested,inflight)
List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction())));
return instantsToStream.stream();
});
}
- getCleanInstantsToArchive:从ActiveTimeline中过滤所有action=[clean,rollback],state=completed的Instant,按action分组,当每组Instant数量>hoodie.keep.max.commits时,该组只保留最新(数量为hoodie.keep.min.commits)的Instant,其余将被归档(max-min)
- getCommitInstantsToArchive:从ActiveTimeline中过滤所有action=[commit,deltacommit,replacecommit],state=completed的Instant,当commit Instants数量>hoodie.keep.max.commits时,先将Instants进行条件过滤,保留最新(数量为hoodie.keep.min.commits)的Instant,其余将被归档(max-min)
可以看到,上面两个方法只获取到completed状态的Instant,而且还少了一种compation类型,缺少的都会通过groupByTsAction进行补充。
2.3 执行归档
确定了所有需要归档的Instant,接着就是实行归档HoodieTimelineArchiver#archive
,读取每个Instant对应的元数据文件反序列化,每个元数据文件都会被包装成一个HoodieArchivedMetaEntry对象,当对象数量达到hoodie.commits.archival.batch时,将数据写到目录.hoodie/archived下生成归档文件
2.4 删除归档文件
HoodieTimelineArchiver#deleteArchivedInstants
遍历所有已经归档的Instant并行删除对应的文件,可以通过配置hoodie.archive.delete.parallelism来设置并行度,然后把.hoodie/aux目录下的元数据文件也删除。
2.5 合并归档结果
从0.11版本开始,支持对归档的结果文件进行合并HoodieTimelineArchiver#mergeArchiveFilesIfNecessary
。虽然每次归档都将多个Instant生成.commit_.arvhive文件,归档的次数多后也会有大量的归档结果文件,所以对于较小的归档结果文件进行合并。
当启用归档合并时(hoodie.archive.merge.enable)且文件系统支持append(StorageSchemas,例如:file://),如果arvhive目录下存在大小小于hoodie.archive.merge.small.file.limit.bytes的文件数量达到hoodie.archive.merge.files.batch.size,即触发合并
- 获取候选文件candidateFiles
- 生成合并计划,保存在arvhive目录下的mergeArchivePlan文件
- 合并成功后删除候选文件
归档配置与清理配置存在关系:hoodie.cleaner.commits.retained <= hoodie.keep.min.commits