searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

快速上手:TeleDB-FlinkCDC-Doris数据同步

2024-05-16 09:46:08
51
0

数据准备

同步需要以下两个前提:

1.您的TeleDB待同步库表中已存在数据,且数据具有主键。

2.您拥有一个TeleDB 用户,该用户对于待同步库表都应该具有所需的权限。验收标准:该用户执行show master status;这条命令能查看binlog开启。参考命令如下。

-- 1.创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

-- 2.向用户授予所需的权限:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';


-- 在实际赋权时,当然还需要一些基本的操作权限,如增删改等。因此您也可以参考如下的命令:
mysql> grant select,insert,update,PROCESS,CREATE,delete,show databases,reload,replication client,replication slave on *.* to 'user'@'%' identified by 'password';
-- 注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
-- 3.刷新用户权限:
mysql> FLUSH PRIVILEGES;

pipeline方式

该示例展示了如何使用pipeline方式将TeleDB数据库中多个业务表的数据(包括快照数据和增量数据)持续同步到 Doris 以完成数据集成操作。

1.  下载客户端

下载flink-cdc-3.0.tar文件并将其解压到本地。

flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。

2.  放置依赖包

从 Maven 下载所需的 CDC pipeline connector JAR,并将其放在${flink-cdc-3.0.0}/lib 目录

flink-cdc-pipeline-connector-doris-3.0.0.jar

flink-cdc-pipeline-connector-mysql-3.0.0.jar

3.  编写 Flink CDC 任务

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\\.*
server-id: 5400-5404
server-time-zone: UTC

sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2

其中:

  • source 中的tables: app_db.\\.*通过正则匹配同步"app_db" 这个库下的所有表,该项使用正则表达式
    • 比如 app_db.table_prefix\\.* 则表示同步 "app_db" 这个库下面 "table_prefix" 为前缀的所有表
  • sink 添加参数table.create.properties.replication_num是由于部署机上只有一个 Doris BE 节点。

4.  启动Flink并提交同步

通过命令行将作业提交到 Flink Standalone集群。

# Submit Pipeline
$ ./bin/flink-cdc.sh mysql-to-doris.yaml

提交成功后,返回信息如:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

在执行 flink-cdc.sh 脚本的过程中,CDC 任务配置会被解析并转换为 DataStream 作业,然后提交到指定的 Flink 集群。

在 Flink Web UI,可以看到相应的pipeline任务正在运行。

进入Doris ,可以看到数据表已经被创建出来,数据能成功写入。

connector方式

1.  放置依赖包

在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,此处需要两个连接器。

flink-sql-connector-mysql-cdc-3.0.0.jar

flink-doris-connector-1.16-1.4.0.jar

2.  准备脚本

在$FLINK_HOME/conf/your_dir下创建your_cdc_param.sql与your_cdc_plan.sql。编辑your_cdc_param.sql以说明您的CDC作业配置参数。编辑your_cdc_plan.sql以说明您的CDC作业逻辑。

示例的your_cdc_param.sql文件如下:

SET execution.checkpointing.interval = 120s;
SET pipeline.operator-chaining=false;
SET execution.checkpointing.tolerable-failed-checkpoints = 200;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.aligned-checkpoint-timeout = 10s;
SET execution.target = yarn-per-job;
SET state.backend.type = filesystem;
SET state.backend.incremental = true;
SET state.checkpoint-storage = filesystem;
SET state.checkpoints.dir = hdfs://ctyunns/flink-checkpoint/;
SET state.backend.local-recovery = true;
SET taskmanager.numberOfTaskSlots = 2;
SET taskmanager.memory.managed.fraction = 0.1;
SET parallelism.default = 4;
SET jobmanager.memory.process.size = 8g;
SET taskmanager.memory.process.size = 16g;

示例的your_cdc_plan.sql文件如下:

CREATE TABLE mysql_tb_10w (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
my_id INT,
str_1 VARCHAR(128),
str_2 VARCHAR(128),
str_3 VARCHAR(128),
str_4 VARCHAR(128),
str_5 VARCHAR(128),
str_6 VARCHAR(128),
str_7 VARCHAR(128),
str_8 VARCHAR(128),
str_9 VARCHAR(128),
str_10 VARCHAR(128),
str_11 VARCHAR(128),
str_12 VARCHAR(128),
str_13 VARCHAR(128),
str_14 VARCHAR(128),
str_15 VARCHAR(128),
str_16 VARCHAR(128),
str_17 VARCHAR(128),
str_18 VARCHAR(128),
str_19 VARCHAR(128),
str_20 VARCHAR(128),
PRIMARY KEY (`my_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'your_teledb_hostname',
'port' = 'your_teledb_port',
'username' = 'your_teledb_username',
'password' = 'your_teledb_password',
'database-name' = 'your_teledb_database_name',
'table-name' = 'your_teledb_table_name',
'scan.incremental.snapshot.enabled' = 'true'
);
CREATE TABLE doris_tb_10w (
my_id INT,
str_1 VARCHAR(128),
str_2 VARCHAR(128),
str_3 VARCHAR(128),
str_4 VARCHAR(128),
str_5 VARCHAR(128),
str_6 VARCHAR(128),
str_7 VARCHAR(128),
str_8 VARCHAR(128),
str_9 VARCHAR(128),
str_10 VARCHAR(128),
str_11 VARCHAR(128),
str_12 VARCHAR(128),
str_13 VARCHAR(128),
str_14 VARCHAR(128),
str_15 VARCHAR(128),
str_16 VARCHAR(128),
str_17 VARCHAR(128),
str_18 VARCHAR(128),
str_19 VARCHAR(128),
str_20 VARCHAR(128),
database_name VARCHAR(50),
table_name VARCHAR(200)
)
WITH(
'connector' = 'doris',
'fenodes' = 'your_doris_fenodes',
'table.identifier' = 'your_doris_database_name.your_doris_table_name',
'username' = 'your_doris_username',
'password' = 'your_doris_password',
'sink.properties.two_phase_commit'='true',
'sink.label-prefix'='your_doris_label_prefix',
'sink.properties.format' = 'json',
'doris.batch.size' = '4096',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true'
);
insert into doris_tb_10w (my_id,str_1,str_2,str_3,str_4,str_5,str_6,str_7,str_8,str_9,str_10,str_11,str_12,str_13,str_14,str_15,str_16,str_17,str_18,str_19,str_20,database_name,table_name)
select my_id,str_1,str_2,str_3,str_4,str_5,str_6,str_7,str_8,str_9,str_10,str_11,str_12,str_13,str_14,str_15,str_16,str_17,str_18,str_19,str_20,database_name,table_name from mysql_tb_10w;

3.  启动Flink并提交同步

在$FLINK_HOME目录下执行以下命令提交作业。

./bin/sql-client.sh embedded -i conf/your_dir/your_cdc_param.sql -f conf/your_dir/your_cdc_plan.sql

在 Flink Web UI,可以看到相应的cdc 任务正在运行。

进入Doris ,可以看到数据表已经被创建出来,数据能成功写入。

打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功被同步

0条评论
0 / 1000
Sirius.
5文章数
0粉丝数
Sirius.
5 文章 | 0 粉丝
原创

快速上手:TeleDB-FlinkCDC-Doris数据同步

2024-05-16 09:46:08
51
0

数据准备

同步需要以下两个前提:

1.您的TeleDB待同步库表中已存在数据,且数据具有主键。

2.您拥有一个TeleDB 用户,该用户对于待同步库表都应该具有所需的权限。验收标准:该用户执行show master status;这条命令能查看binlog开启。参考命令如下。

-- 1.创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

-- 2.向用户授予所需的权限:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';


-- 在实际赋权时,当然还需要一些基本的操作权限,如增删改等。因此您也可以参考如下的命令:
mysql> grant select,insert,update,PROCESS,CREATE,delete,show databases,reload,replication client,replication slave on *.* to 'user'@'%' identified by 'password';
-- 注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
-- 3.刷新用户权限:
mysql> FLUSH PRIVILEGES;

pipeline方式

该示例展示了如何使用pipeline方式将TeleDB数据库中多个业务表的数据(包括快照数据和增量数据)持续同步到 Doris 以完成数据集成操作。

1.  下载客户端

下载flink-cdc-3.0.tar文件并将其解压到本地。

flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。

2.  放置依赖包

从 Maven 下载所需的 CDC pipeline connector JAR,并将其放在${flink-cdc-3.0.0}/lib 目录

flink-cdc-pipeline-connector-doris-3.0.0.jar

flink-cdc-pipeline-connector-mysql-3.0.0.jar

3.  编写 Flink CDC 任务

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\\.*
server-id: 5400-5404
server-time-zone: UTC

sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2

其中:

  • source 中的tables: app_db.\\.*通过正则匹配同步"app_db" 这个库下的所有表,该项使用正则表达式
    • 比如 app_db.table_prefix\\.* 则表示同步 "app_db" 这个库下面 "table_prefix" 为前缀的所有表
  • sink 添加参数table.create.properties.replication_num是由于部署机上只有一个 Doris BE 节点。

4.  启动Flink并提交同步

通过命令行将作业提交到 Flink Standalone集群。

# Submit Pipeline
$ ./bin/flink-cdc.sh mysql-to-doris.yaml

提交成功后,返回信息如:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

在执行 flink-cdc.sh 脚本的过程中,CDC 任务配置会被解析并转换为 DataStream 作业,然后提交到指定的 Flink 集群。

在 Flink Web UI,可以看到相应的pipeline任务正在运行。

进入Doris ,可以看到数据表已经被创建出来,数据能成功写入。

connector方式

1.  放置依赖包

在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,此处需要两个连接器。

flink-sql-connector-mysql-cdc-3.0.0.jar

flink-doris-connector-1.16-1.4.0.jar

2.  准备脚本

在$FLINK_HOME/conf/your_dir下创建your_cdc_param.sql与your_cdc_plan.sql。编辑your_cdc_param.sql以说明您的CDC作业配置参数。编辑your_cdc_plan.sql以说明您的CDC作业逻辑。

示例的your_cdc_param.sql文件如下:

SET execution.checkpointing.interval = 120s;
SET pipeline.operator-chaining=false;
SET execution.checkpointing.tolerable-failed-checkpoints = 200;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.aligned-checkpoint-timeout = 10s;
SET execution.target = yarn-per-job;
SET state.backend.type = filesystem;
SET state.backend.incremental = true;
SET state.checkpoint-storage = filesystem;
SET state.checkpoints.dir = hdfs://ctyunns/flink-checkpoint/;
SET state.backend.local-recovery = true;
SET taskmanager.numberOfTaskSlots = 2;
SET taskmanager.memory.managed.fraction = 0.1;
SET parallelism.default = 4;
SET jobmanager.memory.process.size = 8g;
SET taskmanager.memory.process.size = 16g;

示例的your_cdc_plan.sql文件如下:

CREATE TABLE mysql_tb_10w (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
my_id INT,
str_1 VARCHAR(128),
str_2 VARCHAR(128),
str_3 VARCHAR(128),
str_4 VARCHAR(128),
str_5 VARCHAR(128),
str_6 VARCHAR(128),
str_7 VARCHAR(128),
str_8 VARCHAR(128),
str_9 VARCHAR(128),
str_10 VARCHAR(128),
str_11 VARCHAR(128),
str_12 VARCHAR(128),
str_13 VARCHAR(128),
str_14 VARCHAR(128),
str_15 VARCHAR(128),
str_16 VARCHAR(128),
str_17 VARCHAR(128),
str_18 VARCHAR(128),
str_19 VARCHAR(128),
str_20 VARCHAR(128),
PRIMARY KEY (`my_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'your_teledb_hostname',
'port' = 'your_teledb_port',
'username' = 'your_teledb_username',
'password' = 'your_teledb_password',
'database-name' = 'your_teledb_database_name',
'table-name' = 'your_teledb_table_name',
'scan.incremental.snapshot.enabled' = 'true'
);
CREATE TABLE doris_tb_10w (
my_id INT,
str_1 VARCHAR(128),
str_2 VARCHAR(128),
str_3 VARCHAR(128),
str_4 VARCHAR(128),
str_5 VARCHAR(128),
str_6 VARCHAR(128),
str_7 VARCHAR(128),
str_8 VARCHAR(128),
str_9 VARCHAR(128),
str_10 VARCHAR(128),
str_11 VARCHAR(128),
str_12 VARCHAR(128),
str_13 VARCHAR(128),
str_14 VARCHAR(128),
str_15 VARCHAR(128),
str_16 VARCHAR(128),
str_17 VARCHAR(128),
str_18 VARCHAR(128),
str_19 VARCHAR(128),
str_20 VARCHAR(128),
database_name VARCHAR(50),
table_name VARCHAR(200)
)
WITH(
'connector' = 'doris',
'fenodes' = 'your_doris_fenodes',
'table.identifier' = 'your_doris_database_name.your_doris_table_name',
'username' = 'your_doris_username',
'password' = 'your_doris_password',
'sink.properties.two_phase_commit'='true',
'sink.label-prefix'='your_doris_label_prefix',
'sink.properties.format' = 'json',
'doris.batch.size' = '4096',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true'
);
insert into doris_tb_10w (my_id,str_1,str_2,str_3,str_4,str_5,str_6,str_7,str_8,str_9,str_10,str_11,str_12,str_13,str_14,str_15,str_16,str_17,str_18,str_19,str_20,database_name,table_name)
select my_id,str_1,str_2,str_3,str_4,str_5,str_6,str_7,str_8,str_9,str_10,str_11,str_12,str_13,str_14,str_15,str_16,str_17,str_18,str_19,str_20,database_name,table_name from mysql_tb_10w;

3.  启动Flink并提交同步

在$FLINK_HOME目录下执行以下命令提交作业。

./bin/sql-client.sh embedded -i conf/your_dir/your_cdc_param.sql -f conf/your_dir/your_cdc_plan.sql

在 Flink Web UI,可以看到相应的cdc 任务正在运行。

进入Doris ,可以看到数据表已经被创建出来,数据能成功写入。

打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功被同步

文章来自个人专栏
Sirius
5 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
2
1