mysql由于其极广的使用范围和事务性,是多个系统开发的首选.但是随时数据量的增大,其查询速度已经成为制约系统的主要因素,因此基于mysql的分库分表方案层出不穷,但是分库分表会带来严重的查询问题,因为一般只能通过一个维度(字段)进行分表,对于综合查询非常不友好,因此mmp数据库很好的解决了这个问题.
那么mysql的数据库如何同步mmp类型的数据库呢.
第一种,手撸代码
第一, 使用canal、kafka和java实现,简单来说就是借助canal工具解析mysql的binlog,然后发送消息到kafka,再通过java接收消息手撸代码实现.具体操作如下:
下载canal的安装包,解压在需要安装的机器目录下,然后进行配置文件的相关修改
进入 canal/conf/example 目录下,这里是你连接数据库的实例信息和要发送相关信息到哪个mq主题的配置
# position info
canal.instance.master.address= 你的数据库连接地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=数据库的用户名
canal.instance.dbPassword=数据库的密码
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = park
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=发送到消息队列的哪个主题
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
修改canal/conf下的canal.properties,其中主要配置mq的相关信息
canal.mq.servers = 你的kafka地址
canal.mq.retries = 1
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.parallelThreadSize = 8
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
canal.mq.vhost=
canal.mq.exchange=
canal.mq.username=
canal.mq.password=
canal.mq.aliyunuid=
kafka的安装暂不在这里赘述,普通的kafka集群即可
java侧,添加canal依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.4</version>
<exclusions>
<exclusion>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>
监听kafka消息,写入另一个数据库,其中topic为maindata,可以通过代码
@KafkaListener(topics = "maindata")
public void listener(ConsumerRecord<String, String> record) {
ParkMessage message = null;
try {
message = JSON.parseObject(record.value(), ParkMessage.class);
} catch (Exception e) {
return;
}
String mainPCD = "posChargeData";
String mainPark = "park";
if (message.getTable().equals("posChargeData")) {
List<PosChargeDataM> posChargeDataMList = JSON.parseArray(message.getData(), PosChargeDataM.class);
if (message.getType().equalsIgnoreCase("insert") || message.getType().equalsIgnoreCase("update")) {
posChargeDataMList.forEach(posChargeDataM -> posChargeDataDaoM.save(posChargeDataM, mainPCD));
}
if (message.getType().equalsIgnoreCase("delete")) {
posChargeDataMList.forEach(posChargeDataM -> posChargeDataDaoM.remove(posChargeDataM.getId(), mainPCD));
}
} else if (message.getTable().equals("car_channel_record")) {
List<CarChannelRecordM> carChannelRecordMList = JSON.parseArray(message.getData(), CarChannelRecordM.class);
if (message.getType().equalsIgnoreCase("insert") || message.getType().equalsIgnoreCase("update")) {
carChannelRecordMList.forEach(carChannelRecordM -> posChargeDataDaoM.save(carChannelRecordM, "carChannelRecord"));
}
if (message.getType().equalsIgnoreCase("delete")) {
carChannelRecordMList.forEach(carChannelRecordM -> posChargeDataDaoM.remove(carChannelRecordM.getId(), "carChannelRecord"));
}
} else if (message.getTable().equalsIgnoreCase("fee_flow")) {
List<FeeFlow> feeFlows = JSON.parseArray(message.getData(), FeeFlow.class);
if (message.getType().equalsIgnoreCase("insert")) {
feeFlows.forEach(feeFlow -> {
posChargeDataDaoM.save(feeFlow, "feeFlow");
});
}
} else if (message.getTable().equalsIgnoreCase("park")) {
List<ParkM> parkMList = JSON.parseArray(message.getData(), ParkM.class);
if (message.getType().equalsIgnoreCase("update") || message.getType().equalsIgnoreCase("insert")) {
parkMList.forEach(parkM -> posChargeDataDaoM.save(parkM, mainPark));
}
if (message.getType().toLowerCase().equals("delete")) {
// parkService.removeById(park.getId());
}
} else {
log.info("不处理表 {} 中的数据: {}", message.getTable(), message.getData());
}
}
其中ParkMessage是canal发送过来的消息,其数据结构如下
@Data
public class ParkMessage {
private String database; // 读取的数据库名
private String table; // 读取的表名
private String type; //数据操作类型 如增删改等
private String data; // 实际的数据内容 json格式 需再次格式化
}
这样读取到单条信息后,就可以进行各种操作
第二种,flink CDC功能
flinkCDC功能是面向binlog进行同步、对数据的增删改进行同步的工具。
在这里,我们只需引入相关依赖即可进行操作,如下所示
<!-- flink connector cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.sql.connector.cdc.version}</version>
</dependency>
Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。
package cn.ctyun.demo.flinksql;
import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @classname: ReadFromCdc
* @description: 通过cdc获取数据变化进行输入
* @author: Liu Xinyuan
* @create: 2023-04-12 15:09
**/
public class FlinkSqlReadFromCdc {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.disableOperatorChaining();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 创建读取表,使用mysql-cdc进行,注意此时应标记主键
String source_ddl = "CREATE TABLE UserSource (" +
" id INT, " +
" name VARCHAR, " +
" phone VARCHAR, " +
" sex INT, " +
" primary key (id) not enforced" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = '*******'," +
" 'port' = '3307'," +
" 'username' = '" + parameterTool.get("user") + "', " +
" 'password' = '" + parameterTool.get("passwd") + "'" +
" 'database-name' = 'test_cdc_source'," +
" 'table-name' = 'test_user_table'," +
" 'debezium.log.mining.continuous.mine'='true',"+
" 'debezium.log.mining.strategy'='online_catalog', " +
" 'debezium.database.tablename.case.insensitive'='false',"+
" 'jdbc.properties.useSSL' = 'false' ," +
" 'scan.startup.mode' = 'initial')";
tableEnv.executeSql(source_ddl);
// 2. 创建写出表,使用mysql进行
String sink_ddl = "CREATE TABLE UserSink (" +
" id INT, " +
" name VARCHAR, " +
" phone VARCHAR, " +
" sex INT, " +
" primary key (id) not enforced" +
") WITH (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://******:3306/flink_test_sink?useSSL=false', " +
" 'connector.table' = 'test_user_table', " +
" 'connector.username' = '" + parameterTool.get("sinkUser") + "', " +
" 'connector.password' = '" + parameterTool.get("sinkPasswd") + "'" +
" 'connector.write.flush.max-rows' = '1'" +
")";
tableEnv.executeSql(sink_ddl);
// 3.简单的数据清洗,将电话号码进行hash掩码
tableEnv.createTemporarySystemFunction("MyHASH", HashScalarFunction.class);
Table maskedTable = tableEnv.sqlQuery("SELECT id, name, MyHASH(phone) as phone, sex FROM UserSource");
tableEnv.createTemporaryView("MaskedUserInfo", maskedTable);
// 4.使用insert语句进行数据输出,在这里进行一定地清洗
String insertSql = "INSERT INTO UserSink SELECT * FROM MaskedUserInfo";
TableResult tableResult = tableEnv.executeSql(insertSql);
tableResult.print();
}
}
在这里我们定义了一套简单的数据同步+电话号码掩码的操作。这里重点看cdc相关的配置项,如下所示。
这里有一个重点的配置项, 'scan.startup.mode' = 'initial'
此处是cdc的关键所在,MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 官方文档启动模式了解更多详细信息。这里使用的initial模式为在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog,也就是先进行一次全表扫描后再进行后续的增量同步,由于测试数据较小可以如此进行,cdc的使用者可以根据个人情况进行选择。
String source_ddl = "CREATE TABLE UserSource (" +
" id INT, " +
" name VARCHAR, " +
" phone VARCHAR, " +
" sex INT, " +
" primary key (id) not enforced" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = '******'," +
" 'port' = '3307'," +
" 'username' = '" + parameterTool.get("user") + "', " +
" 'password' = '" + parameterTool.get("passwd") + "'" +
" 'database-name' = 'test_cdc_source'," +
" 'table-name' = 'test_user_table'," +
" 'debezium.log.mining.continuous.mine'='true',"+
" 'debezium.log.mining.strategy'='online_catalog', " +
" 'debezium.database.tablename.case.insensitive'='false',"+
" 'jdbc.properties.useSSL' = 'false' ," +
" 'scan.startup.mode' = 'initial')";
启用后,整个流程为对其中的数据增量同步,由于我们使用的是initial模式,因此我们的数据在任务启动的时候,首先进行了一次全量同步,全量地将信息同步,并且进行了掩码操作。
后续如果添加新的信息也会进行同步,删除亦然。
断点续传功能是flink-cdc在2.0版本后逐渐推行的新功能。此功能能够支持使用savepotin、checkpoint等方式进行断点续传功能。意思为如果我们在中途保留一个保存点,那么任务如果重启的话将会从保存点开始同步cdc数据,中间不会遗失数据(除非手动删除binlog)。
3 总结
上述两种方式是都可以实现mysql复制到其它数据库.第一种方式是手写代码,优点是控制自由,可以写入任何java可写入的数据库,缺点是编码较为复杂,需要引入需要同步的table对象.第二种方式依赖于CDC连接器,由连接器完成这些逻辑,同时需要对flink较为熟悉.还有个缺点是: 写入的数据库非常限制于flink已有的cdc连接器..
因此对于小公司或者只同步几个表的场景中,手撸代码不失为一个可控性好的选择.但如果在大公司,需要同步大量的业务表,则flink-cdc的优势则更为明显.