Doris Routine Load实践
1. 概要说明:
本实践使⽤天翼云内部翼MR开发测试环境,通过一个案例介绍Doris Routine Load的使用。
2. 测试环境说明:
ECX主机环境 | 主机IP | 硬件配置 | 组件 | 软件配置 |
---|---|---|---|---|
翼MR-交付开发组-测试开发 | 172.16.*.* | 30C/128G/500G*2 | 测试工具与数据集 | |
翼MR-交付开发组-测试开发 | 172.16.*.* | 30C/128G/500G*2 | Kafka(BrokerServer) | JVM堆内存30G/数据目录2个对应2个数据盘 |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Kafka(BrokerServer) | JVM堆内存30G/数据目录2个对应2个数据盘 |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Kafka(BrokerServer) | JVM堆内存30G/数据目录2个对应2个数据盘 |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Doris(BE) | 数据目录2个对应2个数据盘 |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Doris(BE) | 数据目录2个对应2个数据盘 |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Doris(BE) | 数据目录2个对应2个数据盘 |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Doris(FE) | JVM堆内存30G |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Doris(FE) | JVM堆内存30G |
翼MR-交付开发组-测试开发 | 172.16. *.* | 30C/128G/500G*2 | Doris(FE) | JVM堆内存30G |
3. 测试方案:
Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。本测试通过无认证和 SASL 认证的两种方式,从 Kakfa 导入 Json 格式的数据。
4. 测试执行
执行人:XXX
执行时间: 2024年3月
5. 测试过程记录:
-
登录kafka主机,查看kafka及zookeeper的配置
/usr/local/kafka/config/server.properties 注意以下信息
zookeeper.connect=server-hostname2:2181,server-hostname1:2181, server-hostname2:2181/kafka
listeners=SASL_PLAINTEXT://server-hostname1:9092,PLAINTEXT:// server-hostname1:9091
得到Zookeeper和broker host及port信息
-
Kafka里创建测试用的topic
[axeadmin@server-hostname1][/usr/local/kafka]
$ ./bin/kafka-topics.sh --create --topic yuandchtest --replication-factor 1 --partitions 1 --zookeeper server-hostname2:2181/kafka
Created topic yuandchtest.
-
查看刚创建的topic,确认创建成功
注意:创建kafka主题时用到的zookeeper配置需要与zookeeper.connect设置一致,要加上zookeeper存放元数据的路径/kafka,否则会到创建或查询topic失败。
-
查看指定的topic详情,使 - -topic 和- -describe参数
[axeadmin@server-hostname1][/usr/local/kafka]
$ ./bin/kafka-topics.sh --zookeeper server-hostname2:2181/kafka --topic yuandchtest --describe
Topic: yuandchtest TopicId: nH8QuG6URzCgzvnF3XkavQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: yuandchtest Partition: 0 Leader: 4 Replicas: 4 Isr: 4
-
准备测试数据(/tmp/yuandch.txt),并写入到创建好的kafka topic:yuandchtest
测试数据在 /tmp/yuandch.txt
{"order_date":"2024-03-01","order_id":"10001","buy_num":"20","user_id":"30001","create_time":"2024-03-01 09:00:00","update_time":"2024-03-07 09:00:00"}
{"order_date":"2024-03-02","order_id":"10002","buy_num":"67","user_id":"30002","create_time":"2024-03-02 09:00:01","update_time":"2024-03-07 09:00:01"}
{"order_date":"2024-03-03","order_id":"10003","buy_num":"11","user_id":"30001","create_time":"2024-03-03 09:00:02","update_time":"2024-03-07 09:00:02"}
{"order_date":"2024-03-04","order_id":"10004","buy_num":"83","user_id":"30003","create_time":"2024-03-04 09:00:03","update_time":"2024-03-07 09:00:03"}
{"order_date":"2024-03-05","order_id":"10005","buy_num":"17","user_id":"30001","create_time":"2024-03-05 09:00:04","update_time":"2024-03-07 09:00:04"}
{"order_date":"2024-03-06","order_id":"10006","buy_num":"23","user_id":"30002","create_time":"2024-03-06 09:00:05","update_time":"2024-03-07 09:00:05"}
- 查看数据是否已写入kafka topic:yuandchtest
-
登录Doris FE,进入mysql client
-
在doris里创建kafka topic对应的数据库表,数据模型使用duplicate 模式
mysql> create database ctyun_DBXX;
Query OK, 0 rows affected (0.02 sec)
CREATE TABLE IF NOT EXISTS ctyun_DBXX. testtable
(
\`order\_date\` DATE NOT NULL COMMENT "下单日期",
\`order\_id\` INT NOT NULL COMMENT "订单id",
\`buy\_num\` TINYINT COMMENT "购买件数",
\`user\_id\` INT COMMENT "[-9223372036854775808, 9223372036854775807]",
\`create\_time\` DATETIME COMMENT "创建时间",
\`update\_time\` DATETIME COMMENT "更新时间"
)
ENGINE=olap
DUPLICATE KEY(`order_date`, `order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 32
PROPERTIES (
"replication_num" = "1");
-
Doris里创建Routine Load Job
a), 无认证的kafka访问:
CREATE ROUTINE LOAD ctyun_DBXX. testtable_routine_load ON yuandchtest
WITH APPEND
COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)
PROPERTIES
(
"desired\_concurrent\_number"="3", "max\_batch\_interval" = "20", "max\_batch\_rows" = "300000", "max\_batch\_size" = "209715200", "strict\_mode" = "false", "format" = "json")
FROM KAFKA
(
"kafka\_broker\_list" = "server-hostname1:9091", "kafka\_topic" = "yuandchtest", "property.kafka\_default\_offsets" = "OFFSET\_BEGINNING");
b), 开启了Kerberos SASL 认证:
CREATE ROUTINE LOAD ctyun_DBXX. testtable _routine_load_sasl ON yuandchtest
WITH APPEND
COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)
PROPERTIES
(
"desired\_concurrent\_number"="3", "max\_batch\_interval" = "20", "max\_batch\_rows" = "300000", "max\_batch\_size" = "209715200", "strict\_mode" = "false", "format" = "json")
FROM KAFKA
(
"kafka\_broker\_list" = "server-hostname1:9092", "kafka\_topic" = "yuandchtest", "property.kafka\_default\_offsets" = "OFFSET\_BEGINNING", "property.security.protocol" =" SASL\_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/security/keytabs/doris.keytab", "property.sasl.kerberos.principal" = "krbtgt/mozidev.env@mozidev.env");
- 通过show routine load查看任务的运行情况
mysql> show routine load\G;
*************************** 1. row ***************************
Id: 21592
Name: testtable \_routine\_load
CreateTime: 2024-03-04 17:53:08
PauseTime: NULL
EndTime: NULL
DbName: default\_cluster:ctyun\_DBXX
TableName: testtable
IsMultiTable: false
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"timezone":"Asia/Shanghai","send\_batch\_parallelism":"1","load\_to\_single\_tablet":"false","maxBatchSizeBytes":"209715200","exec\_mem\_limit":"2147483648","strict\_mode":"false","jsonpaths":"","currentTaskConcurrentNum":"1","fuzzy\_parse":"false","partitions":"\*","columnToColumnExpr":"order\_date,order\_id,buy\_num,user\_id,create\_time,update\_time","maxBatchIntervalS":"20","whereExpr":"\*","dataFormat":"json","precedingFilter":"\*","mergeType":"APPEND","format":"json","json\_root":"","deleteCondition":"\*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip\_outer\_array":"false","execMemLimit":"2147483648","num\_as\_string":"false","maxBatchRows":"300000"}
DataSourceProperties:{"topic":"testtable","currentKafkaPartitions":"0","brokerList":"server-hostname1:9091"}
CustomProperties: {"kafka\_default\_offsets":"OFFSET\_BEGINNING","group.id":" testtable \_routine\_load\_5cb25763-2493-4274-86d2-b4049fd84543"}
Statistic: {"receivedBytes":906,"runningTxns":[17986],"errorRows":0,"committedTaskNum":3,"loadedRows":6,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":6,"unselectedRows":0,"receivedBytesRate":14,"taskExecuteTimeMs":61016}
Progress: {"0":"5"}
Lag: {"0":1}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
User: username
Comment:
1 row in set (0.00 sec)
ERROR:
No query specified
- 验证数据成功写入Doris
6. 测试结果说明:
本次测试显示通过Doris Routine Load提交一个常驻任务,通过不断地从指定的kafka数据源读取数据,将数据导入到 Doris 中。