一、环境信息:
Hudi:0.12.0,spark:3.0.3,flink:1.13.6,hive:3.1.2,hadoop:3.2.1
二、Hudi数据源实现:
前提:引入hudi第三方包,将hudi-hadoop-mr-bundle-0.12.0.jar放到hive的auxlib目录
需要参数:
Kerberos认证相关的keytab,conf,principal
Core-site.xml,hdfs-site.xml路径
三、Hudi元数据获取:
可以通过构建HoodieTableMetaClient实现,参数只需要hadoop conf和hudi表路径,获取表的表名、路径、分区字段等。
schema的获取可以new 一个TableSchemaResolver实现,通过scheam获取表字段相关信息。
四、spark、flink、javaclient三种写入方式的比较:
Java client:
支持insert/upsert/delete,暂不支持bulkInsert。
Insert、upsert方法不支持mor表
支持完整的写Hudi操作,包括rollback、clean、archive等
要自己实现攒批落hudi,按数量和时间攒批
Spark client:
前提:引入hudi第三方包,将hudi-spark3.1-bundle_2.12-0.12.0.jar放到spark的jars目录
Hudi支持在Spark SQL上使用CTAS (Create Table As Select)创建hudi表(为了获得更好的性能来加载数据到hudi表,CTAS使用批量插入(bulk insert)作为写操作)。
支持rdd和spark sql方式,支持bulkinsert
构造HoodieRecord Rdd对象:hudi会根据元数据信息构造HoodieRecord Rdd 对象,方便后续数据去重和数据合并
Spark支持并发写机制,集群模式
Spark的upsert支持两种模式的写入Copy On Write和Merge On Read
使用Spark DataFrame并发写入
Flink client:
前提:引入hudi第三方包,将hudi-flink1.13-bundle-0.12.0.jar放到flink的lib目录
支持集群模式
Flink暂不支持bulkInsert
Insert、upsert方法不支持mor表
Flink 的 writer 将数据放到内存,按以下 3 种策略刷数据到磁盘:
a.当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)
b.当总的 buffer 大小积攒到一定大小(可配,默认 1GB)
c.当 checkpoint 触发,将内存里的数据全部 flush 出去:用于写入HoodieRecord到文件系统中。
可以调整并行度来提升写入速度
为了提高并发写的吞吐量,会给每个bucket assign task分配一套独立的bucket管理策略,并利用Hash算法把bucket ID以固定的规则hash到每个bucket assign task 下面,做到了并发决策。因此,控制bucket assign task并发度就相对控制了写入小文件数量,在写入吞吐量和小文件之间的权衡
小文件策略:优先选择小的file group写入,如果是insert数据,策略是每次选择当前剩余空间最多的bucket写入
五、写入步骤分析:
Spark 写hudi步骤:
1、开始提交:判断上次任务是否失败,如果失败会触发回滚操作。 然后会根据当前时间生成一个事务开始的请求标识元数据。
2、构造HoodieRecord Rdd对象:hudi 会根据元数据信息构造HoodieRecord Rdd 对象,方便后续数据去重和数据合并。
3、数据去重:一批增量数据中可能会有重复的数据,hudi会根据主键对数据进行去重避免重复数据写入hudi 表。
4、数据fileId位置信息获取:在修改记录中可以根据索引获取当前记录所属文件的fileid,在数据合并时需要知道数据update操作向那个fileId文件写入新的快照文件。
5、数据合并:hudi 有两种模式cow和mor。在cow模式中会重写索引命中的fileId快照文件;在mor 模式中根据fileId 追加到分区中的log 文件。
6、完成提交:在元数据中生成xxxx.commit文件,只有生成commit 元数据文件,查询引擎才能根据元数据查询到刚刚upsert 后的数据。
7、compaction压缩:主要是mor 模式中才会有,他会将mor模式中的xxx.log 数据合并到xxx.parquet 快照文件中去。
8、hive元数据同步:hive 的元素数据同步这个步骤需要配置非必需操作,主要是对于hive 和spark 等查询引擎,需要依赖hive 元数据才能进行查询。所以在hive 中的同步就是构造外表提供查询。
flink 写hudi步骤:
分为三个模块:数据写入、数据压缩与数据清理。
数据写入
基础数据封装:将数据流中flink的RowData封装成Hoodie实体;
BucketAssigner:桶分配器,主要是给数据分配写入的文件地址:若为插入操作,则取大小最小的FileGroup对应的FileId文件内进行插入;在此文件的后续写入中文件 ID 保持不变,并且提交时间会更新以显示最新版本。这也意味着记录的任何特定版本,给定其分区路径,都可以使用文件 ID 和 instantTime进行唯一定位;若为更新操作,则直接在当前location进行数据更新
Hoodie Stream Writer: 数据写入,将数据缓存起来,在超过设置的最大flushSize或是做checkpoint时进行刷新到文件中;
Oprator Coordinator:主要与Hoodie Stream Writer进行交互,处理checkpoint等事件,在做checkpoint时,提交instant到timeLine上,并生成下一个instant的时间,算法为取当前最新的commi时间,比对当前时间与commit时间,若当前时间大于commit时间,则返回,否则一直循环等待生成。
数据压缩
压缩(compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance
数据清理
随着用户向表中写入更多数据,对于每次更新,Hudi会生成一个新版本的数据文件用于保存更新后的记录(COPY_ON_WRITE)或将这些增量更新写入日志文件以避免重写更新版本的数据文件(MERGE_ON_READ)。在这种情况下,根据更新频率,文件版本数可能会无限增长,但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。具体清理策略可参考官网,一般使用的清理策略为:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。
Java 写hudi步骤:
InitTable,首先获取table,这里的table为HoodieJavaCopyOnWriteTable
PreWrite,写之前的一些步骤,比如设置操作类型
insert,调用table.insert执行写数据操作,返回result
postWrite,最后调用postWrite执行archive、clean等操作返回WriteStatuses
说明:由于java client 要实现攒批落,所以造数按一批13万大小来验证,spark和flink 写入效率没java client快的原因可能是提交任务需要时间,并且造的数据量不大,没发挥出它的并发效果,对于大数据量的实时写入,spark和flink的效率应该会更高。
六、写入效率:
写入方式 |
写入数据量 |
写入耗时 |
平均速度 |
Java-client |
13万 |
26秒 |
5000条/秒 |
Spark-client |
13万 |
27秒 |
4814条/秒 |
Flink-client |
13万 |
29秒 |
4482条/秒 |
七、总结:
Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。
Hudi Java Client和Spark、Flink一样都可以实现完整的写Hudi的逻辑,但是目前功能支持还不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,毕竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起来比较方便,集成到NIFI的好处是,可以通过拖来拽配置参数的形式完成历史数据和增量数据写入Hudi。也可以自己实现多线程,提升性能,我们目前测试的性能是Insert可以达到5000条/s,而upsert因为需要读取索引,还有历史数据的更新,可能需要重写整个表,所以当历史数据比较大且更新占比比较高时,单线程的性能会非常差,但是可以基于源码改造,将布隆索引和写数据的部分改为多线程。对于数据量不是很大,一般大表几十亿,性能还是可以满足要求的。
将Hudi Java Client封装成了一个NIFI processor,然后用NIFI调度,其性能和稳定性都能够满足项目需求。