searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Doris Routine Load实践

2024-05-29 07:16:58
41
0

Doris Routine Load实践

1. 概要说明:

本实践使⽤天翼云内部翼MR开发测试环境,通过一个案例介绍Doris Routine Load的使用。

2. 测试环境说明:

ECX主机环境 主机IP 硬件配置 组件 软件配置
翼MR-交付开发组-测试开发 172.16.*.* 30C/128G/500G*2 测试工具与数据集
翼MR-交付开发组-测试开发 172.16.*.* 30C/128G/500G*2 Kafka(BrokerServer) JVM堆内存30G/数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Kafka(BrokerServer) JVM堆内存30G/数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Kafka(BrokerServer) JVM堆内存30G/数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(BE) 数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(BE) 数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(BE) 数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(FE) JVM堆内存30G
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(FE) JVM堆内存30G
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(FE) JVM堆内存30G

3. 测试方案:

Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。本测试通过无认证和 SASL 认证的两种方式,从 Kakfa 导入 Json 格式的数据。

4. 测试执行

执行人:XXX

执行时间: 2024年3月

5. 测试过程记录:

  1. 登录kafka主机,查看kafka及zookeeper的配置
    

/usr/local/kafka/config/server.properties 注意以下信息

zookeeper.connect=server-hostname2:2181,server-hostname1:2181, server-hostname2:2181/kafka

listeners=SASL_PLAINTEXT://server-hostname1:9092,PLAINTEXT:// server-hostname1:9091

得到Zookeeper和broker host及port信息

  1. Kafka里创建测试用的topic
    

[axeadmin@server-hostname1][/usr/local/kafka]

$ ./bin/kafka-topics.sh --create --topic yuandchtest --replication-factor 1 --partitions 1 --zookeeper server-hostname2:2181/kafka

Created topic yuandchtest.

  1. 查看刚创建的topic,确认创建成功
    

注意:创建kafka主题时用到的zookeeper配置需要与zookeeper.connect设置一致,要加上zookeeper存放元数据的路径/kafka,否则会到创建或查询topic失败。

  1. 查看指定的topic详情,使 - -topic 和- -describe参数
    

[axeadmin@server-hostname1][/usr/local/kafka]

$ ./bin/kafka-topics.sh --zookeeper server-hostname2:2181/kafka --topic yuandchtest --describe

Topic: yuandchtest TopicId: nH8QuG6URzCgzvnF3XkavQ PartitionCount: 1 ReplicationFactor: 1 Configs:

Topic: yuandchtest Partition: 0      Leader: 4 Replicas: 4       Isr: 4
  1. 准备测试数据(/tmp/yuandch.txt),并写入到创建好的kafka topic:yuandchtest
    

测试数据在 /tmp/yuandch.txt

{"order_date":"2024-03-01","order_id":"10001","buy_num":"20","user_id":"30001","create_time":"2024-03-01 09:00:00","update_time":"2024-03-07 09:00:00"}

{"order_date":"2024-03-02","order_id":"10002","buy_num":"67","user_id":"30002","create_time":"2024-03-02 09:00:01","update_time":"2024-03-07 09:00:01"}

{"order_date":"2024-03-03","order_id":"10003","buy_num":"11","user_id":"30001","create_time":"2024-03-03 09:00:02","update_time":"2024-03-07 09:00:02"}

{"order_date":"2024-03-04","order_id":"10004","buy_num":"83","user_id":"30003","create_time":"2024-03-04 09:00:03","update_time":"2024-03-07 09:00:03"}

{"order_date":"2024-03-05","order_id":"10005","buy_num":"17","user_id":"30001","create_time":"2024-03-05 09:00:04","update_time":"2024-03-07 09:00:04"}

{"order_date":"2024-03-06","order_id":"10006","buy_num":"23","user_id":"30002","create_time":"2024-03-06 09:00:05","update_time":"2024-03-07 09:00:05"}

  1. 查看数据是否已写入kafka topic:yuandchtest

  1. 登录Doris FE,进入mysql client
    

  1. 在doris里创建kafka topic对应的数据库表,数据模型使用duplicate 模式
    

mysql> create database ctyun_DBXX;

Query OK, 0 rows affected (0.02 sec)

CREATE TABLE IF NOT EXISTS ctyun_DBXX. testtable

(

\`order\_date\` DATE NOT NULL COMMENT "下单日期",  

\`order\_id\` INT NOT NULL COMMENT "订单id",  

\`buy\_num\` TINYINT COMMENT "购买件数",  

\`user\_id\` INT COMMENT "[-9223372036854775808, 9223372036854775807]",  

\`create\_time\` DATETIME COMMENT "创建时间",  

\`update\_time\` DATETIME COMMENT "更新时间"

)

ENGINE=olap

DUPLICATE KEY(`order_date`, `order_id`)

DISTRIBUTED BY HASH(`order_id`) BUCKETS 32

PROPERTIES (

"replication_num" = "1");

  1. Doris里创建Routine Load Job
    

a), 无认证的kafka访问:

CREATE ROUTINE LOAD ctyun_DBXX. testtable_routine_load ON yuandchtest

WITH APPEND

COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)

PROPERTIES

(

"desired\_concurrent\_number"="3",    "max\_batch\_interval" = "20",    "max\_batch\_rows" = "300000",    "max\_batch\_size" = "209715200",    "strict\_mode" = "false",    "format" = "json")

FROM KAFKA

(

"kafka\_broker\_list" = "server-hostname1:9091",    "kafka\_topic" = "yuandchtest",    "property.kafka\_default\_offsets" = "OFFSET\_BEGINNING");

b), 开启了Kerberos SASL 认证:

CREATE ROUTINE LOAD ctyun_DBXX. testtable _routine_load_sasl ON yuandchtest

WITH APPEND

COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)

PROPERTIES

(

"desired\_concurrent\_number"="3",    "max\_batch\_interval" = "20",    "max\_batch\_rows" = "300000",    "max\_batch\_size" = "209715200",    "strict\_mode" = "false",    "format" = "json")

FROM KAFKA

(

"kafka\_broker\_list" = "server-hostname1:9092", "kafka\_topic" = "yuandchtest", "property.kafka\_default\_offsets" = "OFFSET\_BEGINNING", "property.security.protocol" =" SASL\_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/security/keytabs/doris.keytab", "property.sasl.kerberos.principal" = "krbtgt/mozidev.env@mozidev.env");

  1. 通过show routine load查看任务的运行情况

mysql> show routine load\G;

*************************** 1. row ***************************

Id: 21592

            Name: testtable \_routine\_load

      CreateTime: 2024-03-04 17:53:08

       PauseTime: NULL

         EndTime: NULL

          DbName: default\_cluster:ctyun\_DBXX

       TableName: testtable

    IsMultiTable: false

           State: RUNNING

  DataSourceType: KAFKA

  CurrentTaskNum: 1

   JobProperties: {"timezone":"Asia/Shanghai","send\_batch\_parallelism":"1","load\_to\_single\_tablet":"false","maxBatchSizeBytes":"209715200","exec\_mem\_limit":"2147483648","strict\_mode":"false","jsonpaths":"","currentTaskConcurrentNum":"1","fuzzy\_parse":"false","partitions":"\*","columnToColumnExpr":"order\_date,order\_id,buy\_num,user\_id,create\_time,update\_time","maxBatchIntervalS":"20","whereExpr":"\*","dataFormat":"json","precedingFilter":"\*","mergeType":"APPEND","format":"json","json\_root":"","deleteCondition":"\*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip\_outer\_array":"false","execMemLimit":"2147483648","num\_as\_string":"false","maxBatchRows":"300000"}

DataSourceProperties:{"topic":"testtable","currentKafkaPartitions":"0","brokerList":"server-hostname1:9091"}

CustomProperties: {"kafka\_default\_offsets":"OFFSET\_BEGINNING","group.id":" testtable \_routine\_load\_5cb25763-2493-4274-86d2-b4049fd84543"}

       Statistic: {"receivedBytes":906,"runningTxns":[17986],"errorRows":0,"committedTaskNum":3,"loadedRows":6,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":6,"unselectedRows":0,"receivedBytesRate":14,"taskExecuteTimeMs":61016}

        Progress: {"0":"5"}

             Lag: {"0":1}

ReasonOfStateChanged:

ErrorLogUrls:

        OtherMsg:

            User: username

         Comment:

1 row in set (0.00 sec)

ERROR:

No query specified

  1. 验证数据成功写入Doris

6. 测试结果说明:

本次测试显示通过Doris Routine Load提交一个常驻任务,通过不断地从指定的kafka数据源读取数据,将数据导入到 Doris 中。

0条评论
0 / 1000
袁****春
4文章数
0粉丝数
袁****春
4 文章 | 0 粉丝
原创

Doris Routine Load实践

2024-05-29 07:16:58
41
0

Doris Routine Load实践

1. 概要说明:

本实践使⽤天翼云内部翼MR开发测试环境,通过一个案例介绍Doris Routine Load的使用。

2. 测试环境说明:

ECX主机环境 主机IP 硬件配置 组件 软件配置
翼MR-交付开发组-测试开发 172.16.*.* 30C/128G/500G*2 测试工具与数据集
翼MR-交付开发组-测试开发 172.16.*.* 30C/128G/500G*2 Kafka(BrokerServer) JVM堆内存30G/数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Kafka(BrokerServer) JVM堆内存30G/数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Kafka(BrokerServer) JVM堆内存30G/数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(BE) 数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(BE) 数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(BE) 数据目录2个对应2个数据盘
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(FE) JVM堆内存30G
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(FE) JVM堆内存30G
翼MR-交付开发组-测试开发 172.16. *.* 30C/128G/500G*2 Doris(FE) JVM堆内存30G

3. 测试方案:

Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。本测试通过无认证和 SASL 认证的两种方式,从 Kakfa 导入 Json 格式的数据。

4. 测试执行

执行人:XXX

执行时间: 2024年3月

5. 测试过程记录:

  1. 登录kafka主机,查看kafka及zookeeper的配置
    

/usr/local/kafka/config/server.properties 注意以下信息

zookeeper.connect=server-hostname2:2181,server-hostname1:2181, server-hostname2:2181/kafka

listeners=SASL_PLAINTEXT://server-hostname1:9092,PLAINTEXT:// server-hostname1:9091

得到Zookeeper和broker host及port信息

  1. Kafka里创建测试用的topic
    

[axeadmin@server-hostname1][/usr/local/kafka]

$ ./bin/kafka-topics.sh --create --topic yuandchtest --replication-factor 1 --partitions 1 --zookeeper server-hostname2:2181/kafka

Created topic yuandchtest.

  1. 查看刚创建的topic,确认创建成功
    

注意:创建kafka主题时用到的zookeeper配置需要与zookeeper.connect设置一致,要加上zookeeper存放元数据的路径/kafka,否则会到创建或查询topic失败。

  1. 查看指定的topic详情,使 - -topic 和- -describe参数
    

[axeadmin@server-hostname1][/usr/local/kafka]

$ ./bin/kafka-topics.sh --zookeeper server-hostname2:2181/kafka --topic yuandchtest --describe

Topic: yuandchtest TopicId: nH8QuG6URzCgzvnF3XkavQ PartitionCount: 1 ReplicationFactor: 1 Configs:

Topic: yuandchtest Partition: 0      Leader: 4 Replicas: 4       Isr: 4
  1. 准备测试数据(/tmp/yuandch.txt),并写入到创建好的kafka topic:yuandchtest
    

测试数据在 /tmp/yuandch.txt

{"order_date":"2024-03-01","order_id":"10001","buy_num":"20","user_id":"30001","create_time":"2024-03-01 09:00:00","update_time":"2024-03-07 09:00:00"}

{"order_date":"2024-03-02","order_id":"10002","buy_num":"67","user_id":"30002","create_time":"2024-03-02 09:00:01","update_time":"2024-03-07 09:00:01"}

{"order_date":"2024-03-03","order_id":"10003","buy_num":"11","user_id":"30001","create_time":"2024-03-03 09:00:02","update_time":"2024-03-07 09:00:02"}

{"order_date":"2024-03-04","order_id":"10004","buy_num":"83","user_id":"30003","create_time":"2024-03-04 09:00:03","update_time":"2024-03-07 09:00:03"}

{"order_date":"2024-03-05","order_id":"10005","buy_num":"17","user_id":"30001","create_time":"2024-03-05 09:00:04","update_time":"2024-03-07 09:00:04"}

{"order_date":"2024-03-06","order_id":"10006","buy_num":"23","user_id":"30002","create_time":"2024-03-06 09:00:05","update_time":"2024-03-07 09:00:05"}

  1. 查看数据是否已写入kafka topic:yuandchtest

  1. 登录Doris FE,进入mysql client
    

  1. 在doris里创建kafka topic对应的数据库表,数据模型使用duplicate 模式
    

mysql> create database ctyun_DBXX;

Query OK, 0 rows affected (0.02 sec)

CREATE TABLE IF NOT EXISTS ctyun_DBXX. testtable

(

\`order\_date\` DATE NOT NULL COMMENT "下单日期",  

\`order\_id\` INT NOT NULL COMMENT "订单id",  

\`buy\_num\` TINYINT COMMENT "购买件数",  

\`user\_id\` INT COMMENT "[-9223372036854775808, 9223372036854775807]",  

\`create\_time\` DATETIME COMMENT "创建时间",  

\`update\_time\` DATETIME COMMENT "更新时间"

)

ENGINE=olap

DUPLICATE KEY(`order_date`, `order_id`)

DISTRIBUTED BY HASH(`order_id`) BUCKETS 32

PROPERTIES (

"replication_num" = "1");

  1. Doris里创建Routine Load Job
    

a), 无认证的kafka访问:

CREATE ROUTINE LOAD ctyun_DBXX. testtable_routine_load ON yuandchtest

WITH APPEND

COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)

PROPERTIES

(

"desired\_concurrent\_number"="3",    "max\_batch\_interval" = "20",    "max\_batch\_rows" = "300000",    "max\_batch\_size" = "209715200",    "strict\_mode" = "false",    "format" = "json")

FROM KAFKA

(

"kafka\_broker\_list" = "server-hostname1:9091",    "kafka\_topic" = "yuandchtest",    "property.kafka\_default\_offsets" = "OFFSET\_BEGINNING");

b), 开启了Kerberos SASL 认证:

CREATE ROUTINE LOAD ctyun_DBXX. testtable _routine_load_sasl ON yuandchtest

WITH APPEND

COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)

PROPERTIES

(

"desired\_concurrent\_number"="3",    "max\_batch\_interval" = "20",    "max\_batch\_rows" = "300000",    "max\_batch\_size" = "209715200",    "strict\_mode" = "false",    "format" = "json")

FROM KAFKA

(

"kafka\_broker\_list" = "server-hostname1:9092", "kafka\_topic" = "yuandchtest", "property.kafka\_default\_offsets" = "OFFSET\_BEGINNING", "property.security.protocol" =" SASL\_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/security/keytabs/doris.keytab", "property.sasl.kerberos.principal" = "krbtgt/mozidev.env@mozidev.env");

  1. 通过show routine load查看任务的运行情况

mysql> show routine load\G;

*************************** 1. row ***************************

Id: 21592

            Name: testtable \_routine\_load

      CreateTime: 2024-03-04 17:53:08

       PauseTime: NULL

         EndTime: NULL

          DbName: default\_cluster:ctyun\_DBXX

       TableName: testtable

    IsMultiTable: false

           State: RUNNING

  DataSourceType: KAFKA

  CurrentTaskNum: 1

   JobProperties: {"timezone":"Asia/Shanghai","send\_batch\_parallelism":"1","load\_to\_single\_tablet":"false","maxBatchSizeBytes":"209715200","exec\_mem\_limit":"2147483648","strict\_mode":"false","jsonpaths":"","currentTaskConcurrentNum":"1","fuzzy\_parse":"false","partitions":"\*","columnToColumnExpr":"order\_date,order\_id,buy\_num,user\_id,create\_time,update\_time","maxBatchIntervalS":"20","whereExpr":"\*","dataFormat":"json","precedingFilter":"\*","mergeType":"APPEND","format":"json","json\_root":"","deleteCondition":"\*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip\_outer\_array":"false","execMemLimit":"2147483648","num\_as\_string":"false","maxBatchRows":"300000"}

DataSourceProperties:{"topic":"testtable","currentKafkaPartitions":"0","brokerList":"server-hostname1:9091"}

CustomProperties: {"kafka\_default\_offsets":"OFFSET\_BEGINNING","group.id":" testtable \_routine\_load\_5cb25763-2493-4274-86d2-b4049fd84543"}

       Statistic: {"receivedBytes":906,"runningTxns":[17986],"errorRows":0,"committedTaskNum":3,"loadedRows":6,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":6,"unselectedRows":0,"receivedBytesRate":14,"taskExecuteTimeMs":61016}

        Progress: {"0":"5"}

             Lag: {"0":1}

ReasonOfStateChanged:

ErrorLogUrls:

        OtherMsg:

            User: username

         Comment:

1 row in set (0.00 sec)

ERROR:

No query specified

  1. 验证数据成功写入Doris

6. 测试结果说明:

本次测试显示通过Doris Routine Load提交一个常驻任务,通过不断地从指定的kafka数据源读取数据,将数据导入到 Doris 中。

文章来自个人专栏
Doris数据库开发
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0