一、Spark On Hudi
指南通过 Spark对Hudi 表进行插入、更新、查询操作示范。
1、版本支持
Hudi 适用于 Spark-2.4.3+ 和 Spark 3.x 版本。
Hudi 针对Spark 3 version支持详细列表:
0.12.x 3.3.x (default build), 3.2.x, 3.1.x
0.11.x 3.2.x (default build, Spark bundle only), 3.1.x
0.10.x 3.1.x (default build), 3.0.x
0.7.0 - 0.9.0 3.0.x
0.6.0 and prior not supported
2、Spark-SQL启用hudi功能
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
3、Spark-SQL建Hudi表
Example:
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow', ------两种类型,cow与mor
primaryKey = 'id', ------指定id为主键列
preCombineField = 'ts' ------指定出现主键相同时预合并列
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
4、Spark-SQL插入Hudi表
Example:
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
5、Spark-SQL时间隧道查询Hudi表
Example:
select * from hudi_cow_pt_tbl timestamp as of '20220307091628793' where id = 1;
6、Spark-SQL更新Hudi表
Synax:
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
Example:
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
6、Spark-SQL Merge Into Hudi表
Synax:
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
<merge_condition> =A equal bool condition
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
Example:
merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
7、Spark-SQL Delete Hudi表
Example:
delete from hudi_cow_nonpcf_tbl where uuid = 1;
8、Spark-SQL Alter Hudi表
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
二、Flink On Hudi
1、Flink-SQL插入Hudi表
-- sets up the result mode to tableau to show the results directly in the CLI
set sql-client.execution.result-mode = tableau;
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);
-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
2、Flink-SQL查询Hudi表
-- query from the Hudi table
select * from t1;
3、Flink-SQL更新Hudi表
-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
4、Flink-SQL流式查询Hudi表
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
-- Then query the table in stream mode
select * from t1;