版本情况说明:
flink1.13.1+scala2.11+CDH6.3.0 Hadoop3.0.0+Hive2.1.1+hudi0.10
mysql cdc source DDL:
CREATE TABLE `users` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(20) DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=64 DEFAULT CHARSET=utf8;
mysql写入数据 生成binlog数据:
insert into users (name) values ('hello2');
insert into users (name) values ('world2');
insert into users (name) values ('flink2');
insert into users (id,name) values (4,'spark');
insert into users (name) values ('hudi');
select * from users;
update users set name = 'luo flinkbj' where id = 60;
delete from users where id = 61;
flink sql :
mysql cdc flink sql DDL:
Flink SQL> CREATE TABLE mysql_users (
> id BIGINT PRIMARY KEY NOT ENFORCED ,
> name STRING,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3)
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '127.0.0.1',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = 'root',
> 'server-time-zone' = 'Asia/Shanghai',
> 'debezium.snapshot.mode'='initial',
> 'database-name' = 'luo',
> 'table-name' = 'users'
> );
设置ckp:
Flink SQL> set execution.checkpointing.interval=30sec;
增加分区字段,创建一个视图缓存:
Flink SQL> create view my_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as `partition` FROM mysql_users;
-- hudi MOR模式在线压缩测试:
'compaction.async.enable' = 'true'
---- MOR表压缩:
hive 默认生产两张表:
luo_sync_hive03_ro
luo_sync_hive03_rt
flinksql hudi hive 分区表 DDL:
CREATE TABLE luo_sync_hive03(
id bigint ,
name string,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20),
primary key(id) not enforced --必须指定uuid 主键
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://nameservice1/tmp/luo/hudi/luo_sync_hive03'
, 'hoodie.datasource.write.recordkey.field' = 'id'
, 'write.precombine.field' = 'ts'
, 'write.tasks' = '1'
, 'compaction.tasks' = '1'
, 'write.rate.limit' = '2000'
, 'table.type' = 'MERGE_ON_READ'
, 'compaction.async.enable' = 'true'
, 'compaction.trigger.strategy' = 'num_commits'
, 'compaction.delta_commits' = '5'
, 'changelog.enable' = 'true'
, 'read.streaming.enable' = 'true'
, 'read.streaming.check-interval' = '4'
, 'hive_sync.enable' = 'true'
, 'hive_sync.metastore.uris' = 'thrift://hadoop:9083'
, 'hive_sync.jdbc_url' = 'jdbc:hive2://hadoop:10000'
, 'hive_sync.table' = 'luo_sync_hive03'
, 'hive_sync.db' = 'luo'
, 'hive_sync.username' = ''
, 'hive_sync.password' = ''
, 'hive_sync.support_timestamp' = 'true'
);
-- flinksql 数据写入 hudi,并自动同步创建hive分区表+自动同步数据:
insert into luo_sync_hive03 select id,name,birthday,ts,`partition` from my_v;
默认 5个chk 触发compaction
mysql 端 多次写入数据:
insert into users (name) values ('hello25');
insert into users (name) values ('world26');
insert into users (name) values ('flink27');
insert into users (name) values ('flink32');
update users set name = 'luo flinkbj' where id = 60;
delete from users where id = 61;
--------- hive shell 查询hive数据:
hive> show partitions luo_sync_hive03_rt;
-- select count 异常处理:
hive> select count(1) from luo_sync_hive03_ro;
Diagnostic Messages for this Task:
Error: java.io.IOException: Split class org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit not found
at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:369)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:438)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2409)
at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:367)
... 7 more
hive> add jar hdfs://nameservice1/tmp/luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;
hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;