searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi系列-基于cdc应用与优化

2023-12-19 02:45:06
20
0

1. CDC是个好东西

做数据同步受存储引擎和采集工具的限制,经常都是全量定时同步,亦或是以自增ID或时间作为增量的依据进行增量定时同步,无论是哪种,都存在数据延时较大、会重复同步不变的数据、浪费资源等问题。后来刚接触canal时还大感惊奇,基于mysql的binlog可以这么方便实时同步最新数据,然而历史数据的初始化仍然得使用第三方ETL工具来全量同步。直到flink cdc项目诞生,完全解决了前面的痛点。实时技术的发展已经不能满足于数据只能实时采集,还需要实时地进行数据建模和数据分析,即全链路实时。


cdc的本质是促进了数据在不同环节之间的流转速度,它是实现全链路实时的基础

2. hudi的cdc特点

MOR表的设计天然地支持CDC,数据写入时,所有的change data会临时以行格式保存在avro文件中,再定期地合并到base文件中。hudi的timeine设计又可以实时或随时capture到change data,其实就是所谓的增量查询(Incremental Queries)

2.1 数据范围

hudi支持多种在读取任务启动时指定的数据范围策略,融合了流程序以flink-cdc和kafka为源时的优点

  1. 支持指定任意的时间段,或从某时间点起始进行消费数据,对于问题排查很有帮助
  2. 支持先全量读取历数据,再自动增量持续读取,可用于数据仓库分层建设或涉及全量数据统计时
  3. 支持只获取当前所有的增量数据,并自动持续

    2.2 数据变更

    hudi在flink中支持两种ChangelogMode:UPSERT(+I,+U,-D),ALL(+I,-U,+U,-D),当changelog.enable=rue时为后者,此时:
  • sink端写时会接收上流所有RowKind类型的数据变更,包括UPDATE_BEFORE
  • source端读时会把所有RowKind类型的数据变更往下流发送,这样在流程序中能处理每条记录的中间状态。但是如果发生压缩,会把旧版本的数据清理掉,如果不能及时消费,那么这些中间状态也会丢失,只保留base文件中的最终状态,所以我们需要结合实际的情况去调mor表的压缩配置和清理配置,使得中间状态数据可以保留足够的时间。

    2.3 数据捕获

    每个流只能捕获单个表的数据,因为每个表的changelog是保存在该表的数据目录中,hudi通过定时器周期性检测目录中是否有新的changelog文件,然后拉取到流程序中进行处理。

    3. 应用

  1. 对于所有数据,包括业务库的和行为事件,采用cdc或消费kafka的方式全量保存到hudi形成ods层数据。后续更多对数据加工处理以ods层为基础,可以减轻直接cdc业务库带来的压力
  2. 利用hudi的cdc能力可以构建实时数仓更高层
  3. 实时计算生成ads数据供上层应用直接查询
  4. 对于一些需要提供给第三方系统的数据,在不暴露业务库下,从hudi中实时推送出去

    4. 优化

 

在一些较为底层的数据处理环节,希望在同一个flink程序中对多个表进行相同的处理逻辑,不像mysql cdc那样可以整库捕获,对于hudi,一个流只能捕获一个表的数据,所以拓扑会是这样:

对于表1,表2的source,flink会定时checkpoint并持久化当前表增量标识(Instant),在这个作业中如果继续进行增加流和减少流,那么很大机率造成变更后作业不能从状态中恢复,那是因为JobGrap的改变导致source算子ID发生了改变,而在hudi构建streaming api时并没有对这些算子进行自定义uid. 针对该问题已向社区提了issue [HUDI-5102].
进一步设想,能不能一个source输入多个表的数据,这样sourrce算子唯一,那么就算不指定UID,也能固定。

如果要做到这样,需要对StreamReadMonitoringFunction与StreamReadOperator两个算子进行改造了,使得每个都支持多表,对所有表的数据以通用的json格式传往下游,然后下游转换按表名分流,涉及的地方比较多,不过应该是可以走得通的,好像没什么必要实现成这样,有时间再研究一下。

0条评论
0 / 1000