searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-文件压缩(compaction)

2022-12-11 07:45:38
299
0

1. 简介

压缩(compaction)仅作用于MergeOnRead类型表,MOR表每次增量提交(deltacommit)都会生成若干个日志文件(行存储的avro文件),为了避免读放大以及减少文件数量,需要配置合适的压缩策略将增量的log file合并到base file(parquet)中。

1.1 环境

  • flink 1.13.6
  • hudi 0.11.0

    1.2 触发策略

    提供4种触发策略,可通过hoodie.compact.inline.trigger.strategy / compaction.trigger.strategy​进行配置:
  • NUM_COMMITS:达到N次commit时触发压缩,默认的触发策略,默认值为5次,通过hoodie.compact.inline.max.delta.commits ​/ compaction.delta_commits配置
  • TIME_ELAPSED:自上次压缩以来经过的时间 > N 秒时触发压缩,通过hoodie.compact.inline.max.delta.seconds​ / compaction.delta_seconds​配置
  • NUM_AND_TIME:NUM_COMMITS和TIME_ELAPSED同时满足时触发压缩
  • NUM_OR_TIME:NUM_COMMITS或TIME_ELAPSED同时满足时触发压缩

1.3 压缩策略

压缩策略是用来决定那些文件组将会被压缩,通过hoodie.compaction.strategy​进行配置,默认为org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy。压缩策略是可插拔的实现,通过继承类

CompactionStrategy也能进行自定义,目前hudi已实现以下几种:
BoundedIOCompactionStrategy
BoundedPartitionAwareCompactionStrategy
DayBasedCompactionStrategy
LogFileSizeBasedCompactionStrategy
UnBoundedCompactionStrategy
UnBoundedPartitionAwareCompactionStrategy

1.4 压缩方式

  • 异步压缩:默认的压缩方式,对于MOR表,采用异步压缩可以降低数据摄取的延时,整个压缩过程分为两大步骤:生成压缩计划、执行压缩
  • 同步压缩:如果想要快速读到最新提交的数据,避免读放大,那么可以采用同步压缩,即在数据提交完成后进行压缩,在同一个job中进行
  • 离线压缩:压缩会消耗大量的内存,且压缩和写操作处于同一个pipeline,当数据量达到10w/s时很容易影响到写性能,这情况下可以将压缩任务与写任务分离,采用离线压缩方式

2. 压缩流程

以异步压缩为例,整个压缩过程分为两大步骤:

  • 生成压缩计划,扫描表下的分区目录选择需要进行压缩的文件分片,然后生成一个compaction plan作为HoodieInstant写入Timeline(.hoodie目录)
  • 执行压缩,读取压缩计划,然后按计划中的内容将目标文件分片进行压缩。

2.1 checkpoint时生成压缩计划

当checkpoint完成时,在方法StreamWriteOperatorCoordinator#notifyCheckpointComplete中生成新的压缩计划,保存成.hoodie/timestamp.compact.requested文件

if (tableState.scheduleCompaction) { //compaction.schedule.enabled=true
  CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
}

StreamWriteOperatorCoordinatorWriteOperatorFactory#getCoordinatorProvider中初始化,WriteOperatorFactory用于生成hudi流式写算子Pipelines#hoodieStreamWrite

2.2 checkpoint时分发压缩命令

当checkpoint完成时,在压缩计划算子方法CompactionPlanOperator#notifyCheckpointComplete中从之前生成的timestamp.compact.requested文件反序列化为压缩计划,解析出多个压缩命令(CompactionPlanEvent)分发给下游的压缩任务消费,CompactionPlanOperator算子是单例的,不允许并行。

private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
  HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
  //首先处理最早出现的压缩计划
  Option<HoodieInstant> firstRequested = pendingCompactionTimeline.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
  String compactionInstantTime = firstRequested.get().getTimestamp();
  //读取.compact.requested文件反序列化
  HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(table.getMetaClient(), compactionInstantTime);
  HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
  table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);  //从requested状态转化为inflight状态,表示开始压缩
  table.getMetaClient().reloadActiveTimeline(); //刷新缓存
  List<CompactionOperation> operations = compactionPlan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
  for (CompactionOperation operation : operations) {
    output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
  }
}
  1. 从ActiveTimeline找到首个requested状态的压缩计划,先处理最早生成的,表示优先级最高,如果当前有处理中的压缩(inflight),则停止处理
  2. 读取.compact.requested文件反序列为HoodieCompactionPlan
  3. 将当前的压缩计划从requested转化为inflight状态,同时时更新ActiveTimeline
  4. 将压缩计划分解成CompactionPlanEvent,发送到下游压缩执行任务

2.3 执行压缩命令

如果开启了异步压缩(compaction.async.enabled),则在单独的线程中运行压缩任务。可以通过compaction.tasks来指定CompactFunction并行度。

//每处理一个CompactionPlanEvent,向下游发送一个CompactionCommitEvent
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
  if (asyncCompaction) {  //异步压缩
    executor.execute(
        () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
        (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
        "Execute compaction for instant %s from task %d", instantTime, taskID);
  } else {
    doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
  }
}
  1. 算子CompactFunction实时接收上游发送的压缩命令(CompactionPlanEvent)
  2. 从压缩命令提取压缩动作(CompactionOperation,封装了压缩所需的信息,包含着一个base文件和多个log文件的信息)
  3. HoodieCompactor#compact执行单个压缩动作,如果base文件存在,则进行update操作,将base文件和log文件合并成一个新的base文件,否则进行insert操作,将log文件生成base文件,最后返回WriteStatus
  4. 将WriteStatus包装成压缩提交事件(CompactionCommitEvent,用来描述单次压缩命令执行的结果),发送给下游的CompactionCommitSink收集

2.4 检查压缩状态

  1. CompactionCommitSink#invoke:接收上游所有压缩任务(CompactFunction)发出的CompactionCommitEvent缓存在commitBuffer
  2. CompactionCommitSink#commitIfNecessary:判断接收到的CompactionCommitEvent数量是否和压缩计划中的Operation相同,如果相同表示所有压缩任务已经完成,再者,所有CompactionCommitEvent都成功的话,表示整个压缩计划成功。
  3. CompactionCommitSink#doCommit:将当前压缩从inflight状态转换到commit状态

3. 总体流程

在这里插入图片描述

0条评论
0 / 1000