searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Hudi入门操作指引

2024-07-05 09:56:33
3
0

一、Spark On Hudi

指南通过 Spark对Hudi 表进行插入、更新、查询操作示范。

 

1、版本支持

Hudi 适用于 Spark-2.4.3+ 和 Spark 3.x 版本。

 

Hudi 针对Spark 3 version支持详细列表:

0.12.x                       3.3.x (default build), 3.2.x, 3.1.x

0.11.x                       3.2.x (default build, Spark bundle only), 3.1.x

0.10.x                       3.1.x (default build), 3.0.x

0.7.0 - 0.9.0              3.0.x

0.6.0 and prior         not supported

 

2、Spark-SQL启用hudi功能

spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 \

--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \

--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \

--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

 

3、Spark-SQL建Hudi表

Example:

create table hudi_cow_pt_tbl (

  id bigint,

  name string,

  ts bigint,

  dt string,

  hh string

using hudi

tblproperties (

  type = 'cow',                             ------两种类型,cow与mor

  primaryKey = 'id',                     ------指定id为主键列

  preCombineField = 'ts'              ------指定出现主键相同时预合并列

 )

partitioned by (dt, hh)

location '/tmp/hudi/hudi_cow_pt_tbl';

 

4、Spark-SQL插入Hudi表

Example:

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

5、Spark-SQL时间隧道查询Hudi表

Example:

select * from hudi_cow_pt_tbl timestamp as of '20220307091628793' where id = 1;

6、Spark-SQL更新Hudi表

Synax:

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

Example:

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

6、Spark-SQL Merge Into Hudi表

Synax:

MERGE INTO tableIdentifier AS target_alias

USING (sub_query | tableIdentifier) AS source_alias

ON <merge_condition>

[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]

[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

 

<merge_condition> =A equal bool condition

<matched_action>  =

  DELETE  |

  UPDATE SET *  |

  UPDATE SET column1 = expression1 [, column2 = expression2 ...]

<not_matched_action>  =

  INSERT *  |

  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

 

Example:

merge into hudi_mor_tbl as target

using merge_source as source

on target.id = source.id

when matched then update set *

when not matched then insert *

7、Spark-SQL Delete Hudi表

Example:

delete from hudi_cow_nonpcf_tbl where uuid = 1;

 

8、Spark-SQL Alter Hudi表

--rename to:

ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

 

--add column:

ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

 

--change column:

ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;

 

--set properties;

alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

 

二、Flink On Hudi

1、Flink-SQL插入Hudi表

-- sets up the result mode to tableau to show the results directly in the CLI
set sql-client.execution.result-mode = tableau;

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

2、Flink-SQL查询Hudi表

-- query from the Hudi table
select * from t1;

3、Flink-SQL更新Hudi表

-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

4、Flink-SQL流式查询Hudi表

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t1;

0条评论
作者已关闭评论
x****n
3文章数
0粉丝数
x****n
3 文章 | 0 粉丝
x****n
3文章数
0粉丝数
x****n
3 文章 | 0 粉丝
原创

Hudi入门操作指引

2024-07-05 09:56:33
3
0

一、Spark On Hudi

指南通过 Spark对Hudi 表进行插入、更新、查询操作示范。

 

1、版本支持

Hudi 适用于 Spark-2.4.3+ 和 Spark 3.x 版本。

 

Hudi 针对Spark 3 version支持详细列表:

0.12.x                       3.3.x (default build), 3.2.x, 3.1.x

0.11.x                       3.2.x (default build, Spark bundle only), 3.1.x

0.10.x                       3.1.x (default build), 3.0.x

0.7.0 - 0.9.0              3.0.x

0.6.0 and prior         not supported

 

2、Spark-SQL启用hudi功能

spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 \

--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \

--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \

--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

 

3、Spark-SQL建Hudi表

Example:

create table hudi_cow_pt_tbl (

  id bigint,

  name string,

  ts bigint,

  dt string,

  hh string

using hudi

tblproperties (

  type = 'cow',                             ------两种类型,cow与mor

  primaryKey = 'id',                     ------指定id为主键列

  preCombineField = 'ts'              ------指定出现主键相同时预合并列

 )

partitioned by (dt, hh)

location '/tmp/hudi/hudi_cow_pt_tbl';

 

4、Spark-SQL插入Hudi表

Example:

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

5、Spark-SQL时间隧道查询Hudi表

Example:

select * from hudi_cow_pt_tbl timestamp as of '20220307091628793' where id = 1;

6、Spark-SQL更新Hudi表

Synax:

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

Example:

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

6、Spark-SQL Merge Into Hudi表

Synax:

MERGE INTO tableIdentifier AS target_alias

USING (sub_query | tableIdentifier) AS source_alias

ON <merge_condition>

[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]

[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

 

<merge_condition> =A equal bool condition

<matched_action>  =

  DELETE  |

  UPDATE SET *  |

  UPDATE SET column1 = expression1 [, column2 = expression2 ...]

<not_matched_action>  =

  INSERT *  |

  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

 

Example:

merge into hudi_mor_tbl as target

using merge_source as source

on target.id = source.id

when matched then update set *

when not matched then insert *

7、Spark-SQL Delete Hudi表

Example:

delete from hudi_cow_nonpcf_tbl where uuid = 1;

 

8、Spark-SQL Alter Hudi表

--rename to:

ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

 

--add column:

ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

 

--change column:

ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;

 

--set properties;

alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

 

二、Flink On Hudi

1、Flink-SQL插入Hudi表

-- sets up the result mode to tableau to show the results directly in the CLI
set sql-client.execution.result-mode = tableau;

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

2、Flink-SQL查询Hudi表

-- query from the Hudi table
select * from t1;

3、Flink-SQL更新Hudi表

-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

4、Flink-SQL流式查询Hudi表

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t1;

文章来自个人专栏
大数据学习之路
3 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0