数据准备
同步需要以下两个前提:
1.您的TeleDB待同步库表中已存在数据,且数据具有主键。
2.您拥有一个TeleDB 用户,该用户对于待同步库表都应该具有所需的权限。验收标准:该用户执行show master status;这条命令能查看binlog开启。参考命令如下。
-- 1.创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
-- 2.向用户授予所需的权限:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
-- 在实际赋权时,当然还需要一些基本的操作权限,如增删改等。因此您也可以参考如下的命令:
mysql> grant select,insert,update,PROCESS,CREATE,delete,show databases,reload,replication client,replication slave on *.* to 'user'@'%' identified by 'password';
-- 注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
-- 3.刷新用户权限:
mysql> FLUSH PRIVILEGES;
pipeline方式
该示例展示了如何使用pipeline方式将TeleDB数据库中多个业务表的数据(包括快照数据和增量数据)持续同步到 Doris 以完成数据集成操作。
1. 下载客户端
下载flink-cdc-3.0.tar文件并将其解压到本地。
flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。
2. 放置依赖包
从 Maven 下载所需的 CDC pipeline connector JAR,并将其放在${flink-cdc-3.0.0}/lib 目录
flink-cdc-pipeline-connector-doris-3.0.0.jar
flink-cdc-pipeline-connector-mysql-3.0.0.jar
3. 编写 Flink CDC 任务
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
其中:
- source 中的tables: app_db.\\.*通过正则匹配同步"app_db" 这个库下的所有表,该项使用正则表达式
- 比如 app_db.table_prefix\\.* 则表示同步 "app_db" 这个库下面 "table_prefix" 为前缀的所有表
- sink 添加参数table.create.properties.replication_num是由于部署机上只有一个 Doris BE 节点。
4. 启动Flink并提交同步
通过命令行将作业提交到 Flink Standalone集群。
# Submit Pipeline
$ ./bin/flink-cdc.sh mysql-to-doris.yaml
提交成功后,返回信息如:
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
在执行 flink-cdc.sh 脚本的过程中,CDC 任务配置会被解析并转换为 DataStream 作业,然后提交到指定的 Flink 集群。
在 Flink Web UI,可以看到相应的pipeline任务正在运行。
进入Doris ,可以看到数据表已经被创建出来,数据能成功写入。
connector方式
1. 放置依赖包
在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,此处需要两个连接器。
flink-sql-connector-mysql-cdc-3.0.0.jar
flink-doris-connector-1.16-1.4.0.jar
2. 准备脚本
在$FLINK_HOME/conf/your_dir下创建your_cdc_param.sql与your_cdc_plan.sql。编辑your_cdc_param.sql以说明您的CDC作业配置参数。编辑your_cdc_plan.sql以说明您的CDC作业逻辑。
示例的your_cdc_param.sql文件如下:
SET execution.checkpointing.interval = 120s;
SET pipeline.operator-chaining=false;
SET execution.checkpointing.tolerable-failed-checkpoints = 200;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.aligned-checkpoint-timeout = 10s;
SET execution.target = yarn-per-job;
SET state.backend.type = filesystem;
SET state.backend.incremental = true;
SET state.checkpoint-storage = filesystem;
SET state.checkpoints.dir = hdfs://ctyunns/flink-checkpoint/;
SET state.backend.local-recovery = true;
SET taskmanager.numberOfTaskSlots = 2;
SET taskmanager.memory.managed.fraction = 0.1;
SET parallelism.default = 4;
SET jobmanager.memory.process.size = 8g;
SET taskmanager.memory.process.size = 16g;
示例的your_cdc_plan.sql文件如下:
CREATE TABLE mysql_tb_10w (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
my_id INT,
str_1 VARCHAR(128),
str_2 VARCHAR(128),
str_3 VARCHAR(128),
str_4 VARCHAR(128),
str_5 VARCHAR(128),
str_6 VARCHAR(128),
str_7 VARCHAR(128),
str_8 VARCHAR(128),
str_9 VARCHAR(128),
str_10 VARCHAR(128),
str_11 VARCHAR(128),
str_12 VARCHAR(128),
str_13 VARCHAR(128),
str_14 VARCHAR(128),
str_15 VARCHAR(128),
str_16 VARCHAR(128),
str_17 VARCHAR(128),
str_18 VARCHAR(128),
str_19 VARCHAR(128),
str_20 VARCHAR(128),
PRIMARY KEY (`my_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'your_teledb_hostname',
'port' = 'your_teledb_port',
'username' = 'your_teledb_username',
'password' = 'your_teledb_password',
'database-name' = 'your_teledb_database_name',
'table-name' = 'your_teledb_table_name',
'scan.incremental.snapshot.enabled' = 'true'
);
CREATE TABLE doris_tb_10w (
my_id INT,
str_1 VARCHAR(128),
str_2 VARCHAR(128),
str_3 VARCHAR(128),
str_4 VARCHAR(128),
str_5 VARCHAR(128),
str_6 VARCHAR(128),
str_7 VARCHAR(128),
str_8 VARCHAR(128),
str_9 VARCHAR(128),
str_10 VARCHAR(128),
str_11 VARCHAR(128),
str_12 VARCHAR(128),
str_13 VARCHAR(128),
str_14 VARCHAR(128),
str_15 VARCHAR(128),
str_16 VARCHAR(128),
str_17 VARCHAR(128),
str_18 VARCHAR(128),
str_19 VARCHAR(128),
str_20 VARCHAR(128),
database_name VARCHAR(50),
table_name VARCHAR(200)
)
WITH(
'connector' = 'doris',
'fenodes' = 'your_doris_fenodes',
'table.identifier' = 'your_doris_database_name.your_doris_table_name',
'username' = 'your_doris_username',
'password' = 'your_doris_password',
'sink.properties.two_phase_commit'='true',
'sink.label-prefix'='your_doris_label_prefix',
'sink.properties.format' = 'json',
'doris.batch.size' = '4096',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true'
);
insert into doris_tb_10w (my_id,str_1,str_2,str_3,str_4,str_5,str_6,str_7,str_8,str_9,str_10,str_11,str_12,str_13,str_14,str_15,str_16,str_17,str_18,str_19,str_20,database_name,table_name)
select my_id,str_1,str_2,str_3,str_4,str_5,str_6,str_7,str_8,str_9,str_10,str_11,str_12,str_13,str_14,str_15,str_16,str_17,str_18,str_19,str_20,database_name,table_name from mysql_tb_10w;
3. 启动Flink并提交同步
在$FLINK_HOME目录下执行以下命令提交作业。
./bin/sql-client.sh embedded -i conf/your_dir/your_cdc_param.sql -f conf/your_dir/your_cdc_plan.sql
在 Flink Web UI,可以看到相应的cdc 任务正在运行。
进入Doris ,可以看到数据表已经被创建出来,数据能成功写入。
打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功被同步。