searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

浅谈mysql的数据实时同步其它数据库

2023-05-26 01:08:50
40
0

mysql由于其极广的使用范围和事务性,是多个系统开发的首选.但是随时数据量的增大,其查询速度已经成为制约系统的主要因素,因此基于mysql的分库分表方案层出不穷,但是分库分表会带来严重的查询问题,因为一般只能通过一个维度(字段)进行分表,对于综合查询非常不友好,因此mmp数据库很好的解决了这个问题.

那么mysql的数据库如何同步mmp类型的数据库呢.

第一种,手撸代码

第一, 使用canal、kafka和java实现,简单来说就是借助canal工具解析mysql的binlog,然后发送消息到kafka,再通过java接收消息手撸代码实现.具体操作如下:

下载canal的安装包,解压在需要安装的机器目录下,然后进行配置文件的相关修改

进入 canal/conf/example 目录下,这里是你连接数据库的实例信息和要发送相关信息到哪个mq主题的配置

# position info
canal.instance.master.address= 你的数据库连接地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=数据库的用户名
canal.instance.dbPassword=数据库的密码
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = park
# enable druid Decrypt database password
canal.instance.enableDruid=false


# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=发送到消息队列的哪个主题
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*

修改canal/conf下的canal.properties,其中主要配置mq的相关信息

canal.mq.servers = 你的kafka地址
canal.mq.retries = 1
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.parallelThreadSize = 8
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
canal.mq.vhost=
canal.mq.exchange=
canal.mq.username=
canal.mq.password=
canal.mq.aliyunuid=

kafka的安装暂不在这里赘述,普通的kafka集群即可

java侧,添加canal依赖

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.4</version>
            <exclusions>
                <exclusion>
                    <artifactId>commons-io</artifactId>
                    <groupId>commons-io</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>logback-classic</artifactId>
                    <groupId>ch.qos.logback</groupId>
                </exclusion>
            </exclusions>
        </dependency>

监听kafka消息,写入另一个数据库,其中topic为maindata,可以通过代码

 @KafkaListener(topics = "maindata")
    public void listener(ConsumerRecord<String, String> record) {
        ParkMessage message = null;
        try {
            message = JSON.parseObject(record.value(), ParkMessage.class);
        } catch (Exception e) {
            return;
        }
        String mainPCD = "posChargeData";
        String mainPark = "park";
        if (message.getTable().equals("posChargeData")) {
            List<PosChargeDataM> posChargeDataMList = JSON.parseArray(message.getData(), PosChargeDataM.class);
            if (message.getType().equalsIgnoreCase("insert") || message.getType().equalsIgnoreCase("update")) {
                posChargeDataMList.forEach(posChargeDataM -> posChargeDataDaoM.save(posChargeDataM, mainPCD));
            }
            if (message.getType().equalsIgnoreCase("delete")) {
                posChargeDataMList.forEach(posChargeDataM -> posChargeDataDaoM.remove(posChargeDataM.getId(), mainPCD));
            }
        } else if (message.getTable().equals("car_channel_record")) {
            List<CarChannelRecordM> carChannelRecordMList = JSON.parseArray(message.getData(), CarChannelRecordM.class);
            if (message.getType().equalsIgnoreCase("insert") || message.getType().equalsIgnoreCase("update")) {
                carChannelRecordMList.forEach(carChannelRecordM -> posChargeDataDaoM.save(carChannelRecordM, "carChannelRecord"));
            }
            if (message.getType().equalsIgnoreCase("delete")) {
                carChannelRecordMList.forEach(carChannelRecordM -> posChargeDataDaoM.remove(carChannelRecordM.getId(), "carChannelRecord"));
            }
        } else if (message.getTable().equalsIgnoreCase("fee_flow")) {
            List<FeeFlow> feeFlows = JSON.parseArray(message.getData(), FeeFlow.class);
            if (message.getType().equalsIgnoreCase("insert")) {
                feeFlows.forEach(feeFlow -> {
                    posChargeDataDaoM.save(feeFlow, "feeFlow");
                });
            }
        } else if (message.getTable().equalsIgnoreCase("park")) {
            List<ParkM> parkMList = JSON.parseArray(message.getData(), ParkM.class);
            if (message.getType().equalsIgnoreCase("update") || message.getType().equalsIgnoreCase("insert")) {
                parkMList.forEach(parkM -> posChargeDataDaoM.save(parkM, mainPark));
            }
            if (message.getType().toLowerCase().equals("delete")) {
                //       parkService.removeById(park.getId());
            }
        } else {
            log.info("不处理表 {} 中的数据: {}", message.getTable(), message.getData());
        }

    }

其中ParkMessage是canal发送过来的消息,其数据结构如下

@Data
public class ParkMessage {
    private String database;  // 读取的数据库名
    private String table; // 读取的表名
    private String type; //数据操作类型 如增删改等
    private String data; // 实际的数据内容 json格式 需再次格式化
}

这样读取到单条信息后,就可以进行各种操作

第二种,flink CDC功能

flinkCDC功能是面向binlog进行同步、对数据的增删改进行同步的工具。

在这里,我们只需引入相关依赖即可进行操作,如下所示

<!-- flink connector cdc  -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink.sql.connector.cdc.version}</version>
</dependency>

Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。

package cn.ctyun.demo.flinksql;

import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @classname: ReadFromCdc
 * @description: 通过cdc获取数据变化进行输入
 * @author: Liu Xinyuan
 * @create: 2023-04-12 15:09
 **/
public class FlinkSqlReadFromCdc {

    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        env.disableOperatorChaining();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 创建读取表,使用mysql-cdc进行,注意此时应标记主键
        String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = '*******'," +
                " 'port' = '3307'," +
                " 'username' = '" + parameterTool.get("user") + "', " +
                " 'password' = '" + parameterTool.get("passwd") + "'" +
                " 'database-name' = 'test_cdc_source'," +
                " 'table-name' = 'test_user_table'," +
                " 'debezium.log.mining.continuous.mine'='true',"+
                " 'debezium.log.mining.strategy'='online_catalog', " +
                " 'debezium.database.tablename.case.insensitive'='false',"+
                " 'jdbc.properties.useSSL' = 'false' ," +
                " 'scan.startup.mode' = 'initial')";
        tableEnv.executeSql(source_ddl);
        //  2. 创建写出表,使用mysql进行
        String sink_ddl = "CREATE TABLE UserSink (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector.type' = 'jdbc', " +
                " 'connector.url' = 'jdbc:mysql://******:3306/flink_test_sink?useSSL=false', " +
                " 'connector.table' = 'test_user_table', " +
                " 'connector.username' = '" + parameterTool.get("sinkUser") + "', " +
                " 'connector.password' = '" + parameterTool.get("sinkPasswd") + "'" +
                " 'connector.write.flush.max-rows' = '1'" +
                ")";
        tableEnv.executeSql(sink_ddl);

        // 3.简单的数据清洗,将电话号码进行hash掩码
        tableEnv.createTemporarySystemFunction("MyHASH", HashScalarFunction.class);
        Table maskedTable = tableEnv.sqlQuery("SELECT id, name, MyHASH(phone) as phone, sex FROM UserSource");
        tableEnv.createTemporaryView("MaskedUserInfo", maskedTable);

        // 4.使用insert语句进行数据输出,在这里进行一定地清洗
        String insertSql = "INSERT INTO UserSink SELECT * FROM MaskedUserInfo";
        TableResult tableResult = tableEnv.executeSql(insertSql);
        tableResult.print();

    }

}

在这里我们定义了一套简单的数据同步+电话号码掩码的操作。这里重点看cdc相关的配置项,如下所示。

这里有一个重点的配置项, 'scan.startup.mode' = 'initial'此处是cdc的关键所在,MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 官方文档启动模式了解更多详细信息。这里使用的initial模式为在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog,也就是先进行一次全表扫描后再进行后续的增量同步,由于测试数据较小可以如此进行,cdc的使用者可以根据个人情况进行选择。

  String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = '******'," +
                " 'port' = '3307'," +
                " 'username' = '" + parameterTool.get("user") + "', " +
                " 'password' = '" + parameterTool.get("passwd") + "'" +
                " 'database-name' = 'test_cdc_source'," +
                " 'table-name' = 'test_user_table'," +
                " 'debezium.log.mining.continuous.mine'='true',"+
                " 'debezium.log.mining.strategy'='online_catalog', " +
                " 'debezium.database.tablename.case.insensitive'='false',"+
                " 'jdbc.properties.useSSL' = 'false' ," +
                " 'scan.startup.mode' = 'initial')";

启用后,整个流程为对其中的数据增量同步,由于我们使用的是initial模式,因此我们的数据在任务启动的时候,首先进行了一次全量同步,全量地将信息同步,并且进行了掩码操作。

后续如果添加新的信息也会进行同步,删除亦然。

断点续传功能是flink-cdc在2.0版本后逐渐推行的新功能。此功能能够支持使用savepotin、checkpoint等方式进行断点续传功能。意思为如果我们在中途保留一个保存点,那么任务如果重启的话将会从保存点开始同步cdc数据,中间不会遗失数据(除非手动删除binlog)。

3 总结

上述两种方式是都可以实现mysql复制到其它数据库.第一种方式是手写代码,优点是控制自由,可以写入任何java可写入的数据库,缺点是编码较为复杂,需要引入需要同步的table对象.第二种方式依赖于CDC连接器,由连接器完成这些逻辑,同时需要对flink较为熟悉.还有个缺点是: 写入的数据库非常限制于flink已有的cdc连接器..

因此对于小公司或者只同步几个表的场景中,手撸代码不失为一个可控性好的选择.但如果在大公司,需要同步大量的业务表,则flink-cdc的优势则更为明显.

0条评论
0 / 1000
刘****猛
7文章数
0粉丝数
刘****猛
7 文章 | 0 粉丝
原创

浅谈mysql的数据实时同步其它数据库

2023-05-26 01:08:50
40
0

mysql由于其极广的使用范围和事务性,是多个系统开发的首选.但是随时数据量的增大,其查询速度已经成为制约系统的主要因素,因此基于mysql的分库分表方案层出不穷,但是分库分表会带来严重的查询问题,因为一般只能通过一个维度(字段)进行分表,对于综合查询非常不友好,因此mmp数据库很好的解决了这个问题.

那么mysql的数据库如何同步mmp类型的数据库呢.

第一种,手撸代码

第一, 使用canal、kafka和java实现,简单来说就是借助canal工具解析mysql的binlog,然后发送消息到kafka,再通过java接收消息手撸代码实现.具体操作如下:

下载canal的安装包,解压在需要安装的机器目录下,然后进行配置文件的相关修改

进入 canal/conf/example 目录下,这里是你连接数据库的实例信息和要发送相关信息到哪个mq主题的配置

# position info
canal.instance.master.address= 你的数据库连接地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=数据库的用户名
canal.instance.dbPassword=数据库的密码
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = park
# enable druid Decrypt database password
canal.instance.enableDruid=false


# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=发送到消息队列的哪个主题
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*

修改canal/conf下的canal.properties,其中主要配置mq的相关信息

canal.mq.servers = 你的kafka地址
canal.mq.retries = 1
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.parallelThreadSize = 8
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
canal.mq.vhost=
canal.mq.exchange=
canal.mq.username=
canal.mq.password=
canal.mq.aliyunuid=

kafka的安装暂不在这里赘述,普通的kafka集群即可

java侧,添加canal依赖

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.4</version>
            <exclusions>
                <exclusion>
                    <artifactId>commons-io</artifactId>
                    <groupId>commons-io</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>logback-classic</artifactId>
                    <groupId>ch.qos.logback</groupId>
                </exclusion>
            </exclusions>
        </dependency>

监听kafka消息,写入另一个数据库,其中topic为maindata,可以通过代码

 @KafkaListener(topics = "maindata")
    public void listener(ConsumerRecord<String, String> record) {
        ParkMessage message = null;
        try {
            message = JSON.parseObject(record.value(), ParkMessage.class);
        } catch (Exception e) {
            return;
        }
        String mainPCD = "posChargeData";
        String mainPark = "park";
        if (message.getTable().equals("posChargeData")) {
            List<PosChargeDataM> posChargeDataMList = JSON.parseArray(message.getData(), PosChargeDataM.class);
            if (message.getType().equalsIgnoreCase("insert") || message.getType().equalsIgnoreCase("update")) {
                posChargeDataMList.forEach(posChargeDataM -> posChargeDataDaoM.save(posChargeDataM, mainPCD));
            }
            if (message.getType().equalsIgnoreCase("delete")) {
                posChargeDataMList.forEach(posChargeDataM -> posChargeDataDaoM.remove(posChargeDataM.getId(), mainPCD));
            }
        } else if (message.getTable().equals("car_channel_record")) {
            List<CarChannelRecordM> carChannelRecordMList = JSON.parseArray(message.getData(), CarChannelRecordM.class);
            if (message.getType().equalsIgnoreCase("insert") || message.getType().equalsIgnoreCase("update")) {
                carChannelRecordMList.forEach(carChannelRecordM -> posChargeDataDaoM.save(carChannelRecordM, "carChannelRecord"));
            }
            if (message.getType().equalsIgnoreCase("delete")) {
                carChannelRecordMList.forEach(carChannelRecordM -> posChargeDataDaoM.remove(carChannelRecordM.getId(), "carChannelRecord"));
            }
        } else if (message.getTable().equalsIgnoreCase("fee_flow")) {
            List<FeeFlow> feeFlows = JSON.parseArray(message.getData(), FeeFlow.class);
            if (message.getType().equalsIgnoreCase("insert")) {
                feeFlows.forEach(feeFlow -> {
                    posChargeDataDaoM.save(feeFlow, "feeFlow");
                });
            }
        } else if (message.getTable().equalsIgnoreCase("park")) {
            List<ParkM> parkMList = JSON.parseArray(message.getData(), ParkM.class);
            if (message.getType().equalsIgnoreCase("update") || message.getType().equalsIgnoreCase("insert")) {
                parkMList.forEach(parkM -> posChargeDataDaoM.save(parkM, mainPark));
            }
            if (message.getType().toLowerCase().equals("delete")) {
                //       parkService.removeById(park.getId());
            }
        } else {
            log.info("不处理表 {} 中的数据: {}", message.getTable(), message.getData());
        }

    }

其中ParkMessage是canal发送过来的消息,其数据结构如下

@Data
public class ParkMessage {
    private String database;  // 读取的数据库名
    private String table; // 读取的表名
    private String type; //数据操作类型 如增删改等
    private String data; // 实际的数据内容 json格式 需再次格式化
}

这样读取到单条信息后,就可以进行各种操作

第二种,flink CDC功能

flinkCDC功能是面向binlog进行同步、对数据的增删改进行同步的工具。

在这里,我们只需引入相关依赖即可进行操作,如下所示

<!-- flink connector cdc  -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink.sql.connector.cdc.version}</version>
</dependency>

Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。

package cn.ctyun.demo.flinksql;

import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @classname: ReadFromCdc
 * @description: 通过cdc获取数据变化进行输入
 * @author: Liu Xinyuan
 * @create: 2023-04-12 15:09
 **/
public class FlinkSqlReadFromCdc {

    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        env.disableOperatorChaining();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1. 创建读取表,使用mysql-cdc进行,注意此时应标记主键
        String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = '*******'," +
                " 'port' = '3307'," +
                " 'username' = '" + parameterTool.get("user") + "', " +
                " 'password' = '" + parameterTool.get("passwd") + "'" +
                " 'database-name' = 'test_cdc_source'," +
                " 'table-name' = 'test_user_table'," +
                " 'debezium.log.mining.continuous.mine'='true',"+
                " 'debezium.log.mining.strategy'='online_catalog', " +
                " 'debezium.database.tablename.case.insensitive'='false',"+
                " 'jdbc.properties.useSSL' = 'false' ," +
                " 'scan.startup.mode' = 'initial')";
        tableEnv.executeSql(source_ddl);
        //  2. 创建写出表,使用mysql进行
        String sink_ddl = "CREATE TABLE UserSink (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector.type' = 'jdbc', " +
                " 'connector.url' = 'jdbc:mysql://******:3306/flink_test_sink?useSSL=false', " +
                " 'connector.table' = 'test_user_table', " +
                " 'connector.username' = '" + parameterTool.get("sinkUser") + "', " +
                " 'connector.password' = '" + parameterTool.get("sinkPasswd") + "'" +
                " 'connector.write.flush.max-rows' = '1'" +
                ")";
        tableEnv.executeSql(sink_ddl);

        // 3.简单的数据清洗,将电话号码进行hash掩码
        tableEnv.createTemporarySystemFunction("MyHASH", HashScalarFunction.class);
        Table maskedTable = tableEnv.sqlQuery("SELECT id, name, MyHASH(phone) as phone, sex FROM UserSource");
        tableEnv.createTemporaryView("MaskedUserInfo", maskedTable);

        // 4.使用insert语句进行数据输出,在这里进行一定地清洗
        String insertSql = "INSERT INTO UserSink SELECT * FROM MaskedUserInfo";
        TableResult tableResult = tableEnv.executeSql(insertSql);
        tableResult.print();

    }

}

在这里我们定义了一套简单的数据同步+电话号码掩码的操作。这里重点看cdc相关的配置项,如下所示。

这里有一个重点的配置项, 'scan.startup.mode' = 'initial'此处是cdc的关键所在,MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 官方文档启动模式了解更多详细信息。这里使用的initial模式为在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog,也就是先进行一次全表扫描后再进行后续的增量同步,由于测试数据较小可以如此进行,cdc的使用者可以根据个人情况进行选择。

  String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = '******'," +
                " 'port' = '3307'," +
                " 'username' = '" + parameterTool.get("user") + "', " +
                " 'password' = '" + parameterTool.get("passwd") + "'" +
                " 'database-name' = 'test_cdc_source'," +
                " 'table-name' = 'test_user_table'," +
                " 'debezium.log.mining.continuous.mine'='true',"+
                " 'debezium.log.mining.strategy'='online_catalog', " +
                " 'debezium.database.tablename.case.insensitive'='false',"+
                " 'jdbc.properties.useSSL' = 'false' ," +
                " 'scan.startup.mode' = 'initial')";

启用后,整个流程为对其中的数据增量同步,由于我们使用的是initial模式,因此我们的数据在任务启动的时候,首先进行了一次全量同步,全量地将信息同步,并且进行了掩码操作。

后续如果添加新的信息也会进行同步,删除亦然。

断点续传功能是flink-cdc在2.0版本后逐渐推行的新功能。此功能能够支持使用savepotin、checkpoint等方式进行断点续传功能。意思为如果我们在中途保留一个保存点,那么任务如果重启的话将会从保存点开始同步cdc数据,中间不会遗失数据(除非手动删除binlog)。

3 总结

上述两种方式是都可以实现mysql复制到其它数据库.第一种方式是手写代码,优点是控制自由,可以写入任何java可写入的数据库,缺点是编码较为复杂,需要引入需要同步的table对象.第二种方式依赖于CDC连接器,由连接器完成这些逻辑,同时需要对flink较为熟悉.还有个缺点是: 写入的数据库非常限制于flink已有的cdc连接器..

因此对于小公司或者只同步几个表的场景中,手撸代码不失为一个可控性好的选择.但如果在大公司,需要同步大量的业务表,则flink-cdc的优势则更为明显.

文章来自个人专栏
个人文章
7 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0