searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

hudi技术预研与原理分析

2023-05-26 09:32:18
110
0

一、环境信息:

Hudi:0.12.0,spark:3.0.3,flink:1.13.6,hive:3.1.2,hadoop:3.2.1

二、Hudi数据源实现:

前提:引入hudi第三方包,将hudi-hadoop-mr-bundle-0.12.0.jar放到hive的auxlib目录

需要参数:

Kerberos认证相关的keytab,conf,principal

Core-site.xml,hdfs-site.xml路径

 

三、Hudi元数据获取:

可以通过构建HoodieTableMetaClient实现,参数只需要hadoop conf和hudi表路径,获取表的表名、路径、分区字段等。

schema的获取可以new 一个TableSchemaResolver实现,通过scheam获取表字段相关信息。

四、spark、flink、javaclient三种写入方式的比较:

Java client:

支持insert/upsert/delete,暂不支持bulkInsert。

Insert、upsert方法不支持mor表

支持完整的写Hudi操作,包括rollback、clean、archive等

要自己实现攒批落hudi,按数量和时间攒批

 

Spark client:

前提:引入hudi第三方包,将hudi-spark3.1-bundle_2.12-0.12.0.jar放到spark的jars目录

Hudi支持在Spark SQL上使用CTAS (Create Table As Select)创建hudi表(为了获得更好的性能来加载数据到hudi表,CTAS使用批量插入(bulk insert)作为写操作)。

支持rdd和spark sql方式,支持bulkinsert

构造HoodieRecord Rdd对象:hudi会根据元数据信息构造HoodieRecord Rdd 对象,方便后续数据去重和数据合并

Spark支持并发写机制,集群模式

Spark的upsert支持两种模式的写入Copy On Write和Merge On Read

使用Spark DataFrame并发写入

 

Flink client:

前提:引入hudi第三方包,将hudi-flink1.13-bundle-0.12.0.jar放到flink的lib目录

支持集群模式

Flink暂不支持bulkInsert

Insert、upsert方法不支持mor表

Flink 的 writer 将数据放到内存,按以下 3 种策略刷数据到磁盘:

a.当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)

b.当总的 buffer 大小积攒到一定大小(可配,默认 1GB)

 

c.当 checkpoint 触发,将内存里的数据全部 flush 出去:用于写入HoodieRecord到文件系统中。

可以调整并行度来提升写入速度

为了提高并发写的吞吐量,会给每个bucket assign task分配一套独立的bucket管理策略,并利用Hash算法把bucket ID以固定的规则hash到每个bucket assign task 下面,做到了并发决策。因此,控制bucket assign task并发度就相对控制了写入小文件数量,在写入吞吐量和小文件之间的权衡

小文件策略:优先选择小的file group写入,如果是insert数据,策略是每次选择当前剩余空间最多的bucket写入

 

五、写入步骤分析:

Spark 写hudi步骤:

1、开始提交:判断上次任务是否失败,如果失败会触发回滚操作。 然后会根据当前时间生成一个事务开始的请求标识元数据。

2、构造HoodieRecord Rdd对象:hudi 会根据元数据信息构造HoodieRecord Rdd 对象,方便后续数据去重和数据合并。

3、数据去重:一批增量数据中可能会有重复的数据,hudi会根据主键对数据进行去重避免重复数据写入hudi 表。

4、数据fileId位置信息获取:在修改记录中可以根据索引获取当前记录所属文件的fileid,在数据合并时需要知道数据update操作向那个fileId文件写入新的快照文件。

5、数据合并:hudi 有两种模式cow和mor。在cow模式中会重写索引命中的fileId快照文件;在mor 模式中根据fileId 追加到分区中的log 文件。

6、完成提交:在元数据中生成xxxx.commit文件,只有生成commit 元数据文件,查询引擎才能根据元数据查询到刚刚upsert 后的数据。

7、compaction压缩:主要是mor 模式中才会有,他会将mor模式中的xxx.log 数据合并到xxx.parquet 快照文件中去。

8、hive元数据同步:hive 的元素数据同步这个步骤需要配置非必需操作,主要是对于hive 和spark 等查询引擎,需要依赖hive 元数据才能进行查询。所以在hive 中的同步就是构造外表提供查询。

 

flink 写hudi步骤:

分为三个模块:数据写入、数据压缩与数据清理。

数据写入

基础数据封装:将数据流中flink的RowData封装成Hoodie实体;

BucketAssigner:桶分配器,主要是给数据分配写入的文件地址:若为插入操作,则取大小最小的FileGroup对应的FileId文件内进行插入;在此文件的后续写入中文件 ID 保持不变,并且提交时间会更新以显示最新版本。这也意味着记录的任何特定版本,给定其分区路径,都可以使用文件 ID 和 instantTime进行唯一定位;若为更新操作,则直接在当前location进行数据更新

Hoodie Stream Writer: 数据写入,将数据缓存起来,在超过设置的最大flushSize或是做checkpoint时进行刷新到文件中;

Oprator Coordinator:主要与Hoodie Stream Writer进行交互,处理checkpoint等事件,在做checkpoint时,提交instant到timeLine上,并生成下一个instant的时间,算法为取当前最新的commi时间,比对当前时间与commit时间,若当前时间大于commit时间,则返回,否则一直循环等待生成。

数据压缩

压缩(compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance

数据清理

随着用户向表中写入更多数据,对于每次更新,Hudi会生成一个新版本的数据文件用于保存更新后的记录(COPY_ON_WRITE)或将这些增量更新写入日志文件以避免重写更新版本的数据文件(MERGE_ON_READ)。在这种情况下,根据更新频率,文件版本数可能会无限增长,但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。具体清理策略可参考官网,一般使用的清理策略为:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。

 

Java 写hudi步骤:

InitTable,首先获取table,这里的table为HoodieJavaCopyOnWriteTable

PreWrite,写之前的一些步骤,比如设置操作类型

insert,调用table.insert执行写数据操作,返回result

postWrite,最后调用postWrite执行archive、clean等操作返回WriteStatuses

 

说明:由于java client 要实现攒批落,所以造数按一批13万大小来验证,spark和flink 写入效率没java client快的原因可能是提交任务需要时间,并且造的数据量不大,没发挥出它的并发效果,对于大数据量的实时写入,spark和flink的效率应该会更高。

六、写入效率:

写入方式

写入数据量

写入耗时

平均速度

Java-client

13万

26秒

5000条/秒

Spark-client

13万

27秒

4814条/秒

Flink-client

13万

29秒

4482条/秒

 

 

七、总结:

 

Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。

Hudi Java Client和Spark、Flink一样都可以实现完整的写Hudi的逻辑,但是目前功能支持还不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,毕竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起来比较方便,集成到NIFI的好处是,可以通过拖来拽配置参数的形式完成历史数据和增量数据写入Hudi。也可以自己实现多线程,提升性能,我们目前测试的性能是Insert可以达到5000条/s,而upsert因为需要读取索引,还有历史数据的更新,可能需要重写整个表,所以当历史数据比较大且更新占比比较高时,单线程的性能会非常差,但是可以基于源码改造,将布隆索引和写数据的部分改为多线程。对于数据量不是很大,一般大表几十亿,性能还是可以满足要求的。

将Hudi Java Client封装成了一个NIFI processor,然后用NIFI调度,其性能和稳定性都能够满足项目需求。

0条评论
0 / 1000
l****n
7文章数
0粉丝数
l****n
7 文章 | 0 粉丝
原创

hudi技术预研与原理分析

2023-05-26 09:32:18
110
0

一、环境信息:

Hudi:0.12.0,spark:3.0.3,flink:1.13.6,hive:3.1.2,hadoop:3.2.1

二、Hudi数据源实现:

前提:引入hudi第三方包,将hudi-hadoop-mr-bundle-0.12.0.jar放到hive的auxlib目录

需要参数:

Kerberos认证相关的keytab,conf,principal

Core-site.xml,hdfs-site.xml路径

 

三、Hudi元数据获取:

可以通过构建HoodieTableMetaClient实现,参数只需要hadoop conf和hudi表路径,获取表的表名、路径、分区字段等。

schema的获取可以new 一个TableSchemaResolver实现,通过scheam获取表字段相关信息。

四、spark、flink、javaclient三种写入方式的比较:

Java client:

支持insert/upsert/delete,暂不支持bulkInsert。

Insert、upsert方法不支持mor表

支持完整的写Hudi操作,包括rollback、clean、archive等

要自己实现攒批落hudi,按数量和时间攒批

 

Spark client:

前提:引入hudi第三方包,将hudi-spark3.1-bundle_2.12-0.12.0.jar放到spark的jars目录

Hudi支持在Spark SQL上使用CTAS (Create Table As Select)创建hudi表(为了获得更好的性能来加载数据到hudi表,CTAS使用批量插入(bulk insert)作为写操作)。

支持rdd和spark sql方式,支持bulkinsert

构造HoodieRecord Rdd对象:hudi会根据元数据信息构造HoodieRecord Rdd 对象,方便后续数据去重和数据合并

Spark支持并发写机制,集群模式

Spark的upsert支持两种模式的写入Copy On Write和Merge On Read

使用Spark DataFrame并发写入

 

Flink client:

前提:引入hudi第三方包,将hudi-flink1.13-bundle-0.12.0.jar放到flink的lib目录

支持集群模式

Flink暂不支持bulkInsert

Insert、upsert方法不支持mor表

Flink 的 writer 将数据放到内存,按以下 3 种策略刷数据到磁盘:

a.当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)

b.当总的 buffer 大小积攒到一定大小(可配,默认 1GB)

 

c.当 checkpoint 触发,将内存里的数据全部 flush 出去:用于写入HoodieRecord到文件系统中。

可以调整并行度来提升写入速度

为了提高并发写的吞吐量,会给每个bucket assign task分配一套独立的bucket管理策略,并利用Hash算法把bucket ID以固定的规则hash到每个bucket assign task 下面,做到了并发决策。因此,控制bucket assign task并发度就相对控制了写入小文件数量,在写入吞吐量和小文件之间的权衡

小文件策略:优先选择小的file group写入,如果是insert数据,策略是每次选择当前剩余空间最多的bucket写入

 

五、写入步骤分析:

Spark 写hudi步骤:

1、开始提交:判断上次任务是否失败,如果失败会触发回滚操作。 然后会根据当前时间生成一个事务开始的请求标识元数据。

2、构造HoodieRecord Rdd对象:hudi 会根据元数据信息构造HoodieRecord Rdd 对象,方便后续数据去重和数据合并。

3、数据去重:一批增量数据中可能会有重复的数据,hudi会根据主键对数据进行去重避免重复数据写入hudi 表。

4、数据fileId位置信息获取:在修改记录中可以根据索引获取当前记录所属文件的fileid,在数据合并时需要知道数据update操作向那个fileId文件写入新的快照文件。

5、数据合并:hudi 有两种模式cow和mor。在cow模式中会重写索引命中的fileId快照文件;在mor 模式中根据fileId 追加到分区中的log 文件。

6、完成提交:在元数据中生成xxxx.commit文件,只有生成commit 元数据文件,查询引擎才能根据元数据查询到刚刚upsert 后的数据。

7、compaction压缩:主要是mor 模式中才会有,他会将mor模式中的xxx.log 数据合并到xxx.parquet 快照文件中去。

8、hive元数据同步:hive 的元素数据同步这个步骤需要配置非必需操作,主要是对于hive 和spark 等查询引擎,需要依赖hive 元数据才能进行查询。所以在hive 中的同步就是构造外表提供查询。

 

flink 写hudi步骤:

分为三个模块:数据写入、数据压缩与数据清理。

数据写入

基础数据封装:将数据流中flink的RowData封装成Hoodie实体;

BucketAssigner:桶分配器,主要是给数据分配写入的文件地址:若为插入操作,则取大小最小的FileGroup对应的FileId文件内进行插入;在此文件的后续写入中文件 ID 保持不变,并且提交时间会更新以显示最新版本。这也意味着记录的任何特定版本,给定其分区路径,都可以使用文件 ID 和 instantTime进行唯一定位;若为更新操作,则直接在当前location进行数据更新

Hoodie Stream Writer: 数据写入,将数据缓存起来,在超过设置的最大flushSize或是做checkpoint时进行刷新到文件中;

Oprator Coordinator:主要与Hoodie Stream Writer进行交互,处理checkpoint等事件,在做checkpoint时,提交instant到timeLine上,并生成下一个instant的时间,算法为取当前最新的commi时间,比对当前时间与commit时间,若当前时间大于commit时间,则返回,否则一直循环等待生成。

数据压缩

压缩(compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance

数据清理

随着用户向表中写入更多数据,对于每次更新,Hudi会生成一个新版本的数据文件用于保存更新后的记录(COPY_ON_WRITE)或将这些增量更新写入日志文件以避免重写更新版本的数据文件(MERGE_ON_READ)。在这种情况下,根据更新频率,文件版本数可能会无限增长,但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。具体清理策略可参考官网,一般使用的清理策略为:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。

 

Java 写hudi步骤:

InitTable,首先获取table,这里的table为HoodieJavaCopyOnWriteTable

PreWrite,写之前的一些步骤,比如设置操作类型

insert,调用table.insert执行写数据操作,返回result

postWrite,最后调用postWrite执行archive、clean等操作返回WriteStatuses

 

说明:由于java client 要实现攒批落,所以造数按一批13万大小来验证,spark和flink 写入效率没java client快的原因可能是提交任务需要时间,并且造的数据量不大,没发挥出它的并发效果,对于大数据量的实时写入,spark和flink的效率应该会更高。

六、写入效率:

写入方式

写入数据量

写入耗时

平均速度

Java-client

13万

26秒

5000条/秒

Spark-client

13万

27秒

4814条/秒

Flink-client

13万

29秒

4482条/秒

 

 

七、总结:

 

Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。

Hudi Java Client和Spark、Flink一样都可以实现完整的写Hudi的逻辑,但是目前功能支持还不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,毕竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起来比较方便,集成到NIFI的好处是,可以通过拖来拽配置参数的形式完成历史数据和增量数据写入Hudi。也可以自己实现多线程,提升性能,我们目前测试的性能是Insert可以达到5000条/s,而upsert因为需要读取索引,还有历史数据的更新,可能需要重写整个表,所以当历史数据比较大且更新占比比较高时,单线程的性能会非常差,但是可以基于源码改造,将布隆索引和写数据的部分改为多线程。对于数据量不是很大,一般大表几十亿,性能还是可以满足要求的。

将Hudi Java Client封装成了一个NIFI processor,然后用NIFI调度,其性能和稳定性都能够满足项目需求。

文章来自个人专栏
大数据平台
7 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0