Amoro
Amoro 是一个 Table Format 管理系统,同时可以对表进行自动合并的操作,本文会描述 Amoro 的架构,以及用户可能感兴趣的流程。
架构
Amoro大致图如下所示,主要组件包括:TableService,OptimizerService,OptimizerExecutor 等, 下面分别介绍不同的组件
TableService
TableService 负责管理所有表的元数据,会周期性的从用户配置的 Catalog 中获取 table 信息,然后同步到 Amoro 中,如果有表删除,也会周期性的进行相关的清理工作。
整体同步流程大致如下:
1. 从 catalog 获取当前的 table 信息
2. 从 Amoro 的 db 中获取当前管理的 table 信息
3. 将新表加到 Amoro 中,并将删除的表从 Amoro 中删除
4. 下个周期重新开始第一步
其中周期性扫描的参数为
refresh-external-catalogs.interval 扫描的周期 默认 180s
refresh-external-catalogs.thread-count 扫描的线程数 默认 10
refresh-external-catalogs.queue-size 能接纳的表的上限 默认 1_000_000
OptimizerService
OptimizerService 主要负责从 tableQueue 中获取 table,然后生成相应的合并计划投递到 taskQueue 中,taskQueue 中以 Process 为单元,每个 Process 包含一组 Task。
首先从 tableQueue 中获取当前可以合并的表首先筛选剔除掉不满足要求的表,条件如下所示:
● table 状态不是 pending (table 待合并中)
● table 没有被 block(当前 table 是否禁止合并)
● table 最近一次 plan 的时间已经超过最小运行时间(plan 表示对表进行合并的分析,生成合并计划投递到 TaskQueue 中)
然后根据配置的调度策略(当前有 Quota 和 Balanced 两种),选择一个 Table,对该 table 生成合并计划投递到 taskQueue 中
其中 Quota 和 Balanced 两种策略如下所示:
● Quota 会按照最近使用的总 quota 大小进行排序
● Balanced 最近一次合并的时间进行排序,优先合并最近未合并的表
OptimizerExecutor
OptimizerExecutor 会持续从 taskExecutor 获取 task,执行,然后循环往复
另外这里增加几个概念:
● container:实际的资源,可以对应 Yarn 中的集群
○ 资源可以使用 local/flink 等
● optimizerGroup : 合并的资源组,类似与 Yarn 中的队列
● optimizer:则是一个实际的 Yarn application
最终 task 会在某个 optimizer 上执行
这里的关系是
● container <-- 1 : n ---> optimizerGroup
● optimizerGroup <-- 1 : n --> optimizer
● optimizerGroup <-- 1 : n --> table
每个 catalog 会有一个默认的 optimizerGroup,表可以修改自己使用的 optimizerGroup
用户视角
从用户视角可能会关系:1 简单的合并策略是什么;2 资源有限的话,如何调整优先级;3)需要合并的比较多,怎么加速
简单合并策略
合并流程如下所示,类似 JVM 的 GC 流程,首先文件进行分类,小于 X 大小的属于 fragment,大于 X 的属于 segment。然后合并有两类:
Self-optimizing type | Input space | Output space | Input file types | Output file types |
minor | fragment | fragment/segment | insert/eq-delete/pos-delete | insert, pos-delete |
major | fragment/segment | segment | insert,eq-delete/pos-delete | insert/pos-delete |
full | fragment/segment | segment | insert/eq-delete/pos-delete | insert |
触发机制
● full 用户通过 self-optimizing.full.trigger.interval 配置触发 full compaction 周期时间
● major 所有 segment 的文件大小超过配置 self-optimizing.target-size 且最小的两个文件之和小于 self-optimizing.target-size
● mino 小于 X 的文件以及 eqdelete 的文件总数超过 self-optimizing.minor.trigger.file-count 且上次 minor 的触发距离现在超过 self-optimizing.minor.trigger.interval
决定合并 type 的时候,按照 full -> major -> minor 继续判断,选择第一个命中的 type 进行执行
资源有限,如何设置优先级
如果资源有限,有如下一些地方可以调整合并的策略
1. 表的获取策略:上面的 Quota 和 Balanced 策略
2. 某些表通过 禁止自动合并
a. 通过 table property: self-optimizing.enabled 进行配置
3. 通过调整合并策略(触发 minor/majro/full 的机制)
a. 参考上面触发机制中的配置
资源无限,如何快速合并
增加资源可以直接保证合并的速度增大,简单的说,增加 optimizer 即可(可以在页面操作 Scale-out 即可)
在 OptimizerGroup 中通过 Scale-Out 增加 Optimizer 即可