searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-文件归档(archive)

2022-12-11 07:45:40
208
0

1. 简介

hudi会不断生成commit、deltacommit、clean等类型的Instant从而形成活跃时间轴(ActiveTimeline),随着时间增长,时间轴变长,.hoodie元数据目录下的文件不断累积,为了限制元数据文件数量,需要对一些比较久远的元数据文件进行归档,保存到.hoodie/archived目录下,可以称之为归档时间轴(ArchivedTimeline)。

1.1 环境

  • flink 1.13.6
  • hudi 0.11.0
  • merge on read表

    1.2 逻辑结构

    在这里插入图片描述

    2. 归档流程

    2.1 每次commit触发归档

    每次数据提交结束后都会触发archive操作HoodieFlinkWriteClient#postCommit,前提是开启了自动归档配置hoodie.archive.automatic
protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArchival) {
  if (!config.isAutoArchive()) return;
  if (config.isAsyncArchive()) AsyncArchiveService.waitForCompletion(asyncArchiveService);
  else  archive(table, acquireLockForArchival);
}
protected void archive(HoodieTable table, boolean acquireLockForArchival) {
    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
    archiver.archiveIfRequired(context, acquireLockForArchival);
}

如果是启用了异步归档hoodie.archive.async,则在异步线程中执行归档并等待结束;BaseHoodieWriteClient#archive创建归档器(HoodieTimelineArchiver)并执行。

2.2 获取归档Instant

需要归档的目标Instant包含6种类型:clean,rollback,commit,deltacommit,replacecommit,compation,其中每种类型有三种状态(对应3个元数据文件),HoodieTimelineArchiver#getInstantsToArchive中分别获取clean和commit两种场景的Instant,进行合并

private Stream<HoodieInstant> getInstantsToArchive() { 
  Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());//获取所有需要归档的Instant,状态为completed
  HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
  //当前整个活跃时间轴下所有Instant: <<时间戳,Instant类型>,对应的Instant集合>,注意compation类型已经被映射为commit类型,因为compation完成时最终状态为commit
  Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants().collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),HoodieInstant.getComparableAction(i.getAction()))));
  return instants.flatMap(hoodieInstant -> {
     //为completed状态的Instant关联回其中间状态(requested,inflight)
     List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction())));
     return instantsToStream.stream();
  });
}
  • getCleanInstantsToArchive:从ActiveTimeline中过滤所有action=[clean,rollback],state=completed的Instant,按action分组,当每组Instant数量>hoodie.keep.max.commits时,该组只保留最新(数量为hoodie.keep.min.commits)的Instant,其余将被归档(max-min)
  • getCommitInstantsToArchive:从ActiveTimeline中过滤所有action=[commit,deltacommit,replacecommit],state=completed的Instant,当commit Instants数量>hoodie.keep.max.commits时,先将Instants进行条件过滤,保留最新(数量为hoodie.keep.min.commits)的Instant,其余将被归档(max-min)
    可以看到,上面两个方法只获取到completed状态的Instant,而且还少了一种compation类型,缺少的都会通过groupByTsAction进行补充。

2.3 执行归档

确定了所有需要归档的Instant,接着就是实行归档HoodieTimelineArchiver#archive,读取每个Instant对应的元数据文件反序列化,每个元数据文件都会被包装成一个HoodieArchivedMetaEntry对象,当对象数量达到hoodie.commits.archival.batch时,将数据写到目录.hoodie/archived下生成归档文件
在这里插入图片描述

2.4 删除归档文件

HoodieTimelineArchiver#deleteArchivedInstants遍历所有已经归档的Instant并行删除对应的文件,可以通过配置hoodie.archive.delete.parallelism来设置并行度,然后把.hoodie/aux目录下的元数据文件也删除。

2.5 合并归档结果

从0.11版本开始,支持对归档的结果文件进行合并HoodieTimelineArchiver#mergeArchiveFilesIfNecessary。虽然每次归档都将多个Instant生成.commit_.arvhive文件,归档的次数多后也会有大量的归档结果文件,所以对于较小的归档结果文件进行合并。
当启用归档合并时(hoodie.archive.merge.enable)且文件系统支持append(StorageSchemas,例如:file://),如果arvhive目录下存在大小小于hoodie.archive.merge.small.file.limit.bytes的文件数量达到hoodie.archive.merge.files.batch.size,即触发合并

  1. 获取候选文件candidateFiles
  2. 生成合并计划,保存在arvhive目录下的mergeArchivePlan文件
  3. 合并成功后删除候选文件

归档配置与清理配置存在关系:hoodie.cleaner.commits.retained <= hoodie.keep.min.commits

0条评论
0 / 1000
矛始
28文章数
0粉丝数
矛始
28 文章 | 0 粉丝
原创

hudi系列-文件归档(archive)

2022-12-11 07:45:40
208
0

1. 简介

hudi会不断生成commit、deltacommit、clean等类型的Instant从而形成活跃时间轴(ActiveTimeline),随着时间增长,时间轴变长,.hoodie元数据目录下的文件不断累积,为了限制元数据文件数量,需要对一些比较久远的元数据文件进行归档,保存到.hoodie/archived目录下,可以称之为归档时间轴(ArchivedTimeline)。

1.1 环境

  • flink 1.13.6
  • hudi 0.11.0
  • merge on read表

    1.2 逻辑结构

    在这里插入图片描述

    2. 归档流程

    2.1 每次commit触发归档

    每次数据提交结束后都会触发archive操作HoodieFlinkWriteClient#postCommit,前提是开启了自动归档配置hoodie.archive.automatic
protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArchival) {
  if (!config.isAutoArchive()) return;
  if (config.isAsyncArchive()) AsyncArchiveService.waitForCompletion(asyncArchiveService);
  else  archive(table, acquireLockForArchival);
}
protected void archive(HoodieTable table, boolean acquireLockForArchival) {
    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
    archiver.archiveIfRequired(context, acquireLockForArchival);
}

如果是启用了异步归档hoodie.archive.async,则在异步线程中执行归档并等待结束;BaseHoodieWriteClient#archive创建归档器(HoodieTimelineArchiver)并执行。

2.2 获取归档Instant

需要归档的目标Instant包含6种类型:clean,rollback,commit,deltacommit,replacecommit,compation,其中每种类型有三种状态(对应3个元数据文件),HoodieTimelineArchiver#getInstantsToArchive中分别获取clean和commit两种场景的Instant,进行合并

private Stream<HoodieInstant> getInstantsToArchive() { 
  Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());//获取所有需要归档的Instant,状态为completed
  HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
  //当前整个活跃时间轴下所有Instant: <<时间戳,Instant类型>,对应的Instant集合>,注意compation类型已经被映射为commit类型,因为compation完成时最终状态为commit
  Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants().collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),HoodieInstant.getComparableAction(i.getAction()))));
  return instants.flatMap(hoodieInstant -> {
     //为completed状态的Instant关联回其中间状态(requested,inflight)
     List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction())));
     return instantsToStream.stream();
  });
}
  • getCleanInstantsToArchive:从ActiveTimeline中过滤所有action=[clean,rollback],state=completed的Instant,按action分组,当每组Instant数量>hoodie.keep.max.commits时,该组只保留最新(数量为hoodie.keep.min.commits)的Instant,其余将被归档(max-min)
  • getCommitInstantsToArchive:从ActiveTimeline中过滤所有action=[commit,deltacommit,replacecommit],state=completed的Instant,当commit Instants数量>hoodie.keep.max.commits时,先将Instants进行条件过滤,保留最新(数量为hoodie.keep.min.commits)的Instant,其余将被归档(max-min)
    可以看到,上面两个方法只获取到completed状态的Instant,而且还少了一种compation类型,缺少的都会通过groupByTsAction进行补充。

2.3 执行归档

确定了所有需要归档的Instant,接着就是实行归档HoodieTimelineArchiver#archive,读取每个Instant对应的元数据文件反序列化,每个元数据文件都会被包装成一个HoodieArchivedMetaEntry对象,当对象数量达到hoodie.commits.archival.batch时,将数据写到目录.hoodie/archived下生成归档文件
在这里插入图片描述

2.4 删除归档文件

HoodieTimelineArchiver#deleteArchivedInstants遍历所有已经归档的Instant并行删除对应的文件,可以通过配置hoodie.archive.delete.parallelism来设置并行度,然后把.hoodie/aux目录下的元数据文件也删除。

2.5 合并归档结果

从0.11版本开始,支持对归档的结果文件进行合并HoodieTimelineArchiver#mergeArchiveFilesIfNecessary。虽然每次归档都将多个Instant生成.commit_.arvhive文件,归档的次数多后也会有大量的归档结果文件,所以对于较小的归档结果文件进行合并。
当启用归档合并时(hoodie.archive.merge.enable)且文件系统支持append(StorageSchemas,例如:file://),如果arvhive目录下存在大小小于hoodie.archive.merge.small.file.limit.bytes的文件数量达到hoodie.archive.merge.files.batch.size,即触发合并

  1. 获取候选文件candidateFiles
  2. 生成合并计划,保存在arvhive目录下的mergeArchivePlan文件
  3. 合并成功后删除候选文件

归档配置与清理配置存在关系:hoodie.cleaner.commits.retained <= hoodie.keep.min.commits

文章来自个人专栏
hudi
17 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0