Debezium 是一个开源的分布式平台,旨在实时捕获和传输数据库的变更数据(Change Data Capture,简称 CDC)。它可以帮助实现将数据库中的变更(如插入、更新、删除)实时同步到其他系统中,常用于构建实时数据管道、数据同步、日志分析等应用场景。
Debezium 的工作原理
Debezium 通过数据库的 事务日志(如 MySQL 的 binlog,PostgreSQL 的 WAL)来捕获数据变化。它监听这些日志的变化,并将变更数据实时地发送到 Kafka 或其他消息队列系统中。
-
事务日志:数据库的事务日志记录了所有数据库变更操作的信息。Debezium 通过读取这些日志获取数据库表的数据更改,并通过一定的格式化和转换,生成一致的变更事件。
-
Kafka:Debezium 通常将捕获到的变更数据作为事件发送到 Apache Kafka 中,供下游系统进行进一步处理。它的变更数据格式通常是 JSON 或 Avro 格式。
Debezium 支持的数据库
Debezium 目前支持多种主流数据库,常见的包括:
- MySQL:通过读取 MySQL 的 binlog 捕获数据变更。
- PostgreSQL:通过监听 PostgreSQL 的 Write-Ahead Logging (WAL) 捕获数据变更。
- MongoDB:通过监听 MongoDB 的 Oplog 捕获数据变更。
- SQL Server:通过 SQL Server 的 Transaction Log 捕获变更。
- Oracle:通过 Oracle Redo Log 捕获变更。
- Db2 和 Cassandra 等也在逐步支持中。
Debezium 的工作流程
- 连接数据库:Debezium 通过连接器(connector)与目标数据库建立连接。
- 读取事务日志:连接器通过读取数据库的事务日志来捕获变更。
- 生成变更事件:捕获到的变更(如插入、更新、删除)被 Debezium 转换为事件,并通过 JSON 或 Avro 格式进行编码。
- 将事件发送到 Kafka:这些变更事件被发布到 Kafka 中,通常以数据库表名作为 Kafka 主题名的一部分。
- 下游消费:Kafka 消费者可以消费这些变更事件并进行进一步处理,如存储到数据仓库、分析系统或触发其他操作。
Debezium 的优势
- 实时性:Debezium 能够在数据库变更发生的几乎同时将数据变更传输到目标系统,适合实时数据同步和处理。
- 松耦合:Debezium 将数据库和下游应用解耦,允许数据库变更被异步、独立地消费,避免了系统间的直接依赖。
- 容错性:通过 Kafka,Debezium 实现了高可用和容错机制,确保数据的可靠传输。
- 灵活性:支持多种数据库,并能适应多种下游消费模型,可以将数据同步到不同类型的存储系统、数据湖、数据仓库等。
典型应用场景
- 数据同步:将不同数据库中的数据实时同步到其他数据库或数据存储系统中。
- 实时分析:将数据库变更数据实时推送到分析平台,进行实时的数据处理和报告生成。
- 微服务架构:Debezium 能够在微服务架构中作为服务间的数据交换机制,通过 Kafka 将不同微服务间的数据库变更实时同步。
- 数据仓库加载:将实时变更数据推送到数据仓库中,保持数据的同步。
快照类型
初始快照 initial snapshot
特点:
1、类似我们的全量迁移,此时增量还没有启动
临时快照 ad hoc snapshot
特点:
1、 在增量运行的过程中,可以对之前没有选择全量迁移的表进行全量迁移,增量全量同时运行
只读临时快照 Ad hoc read-only incremental snapshots
特点:
跟临时快照是一样得,只不过由于源库只能读,所以不能使用信号表插入数据来实现全量和增量同时进行,使用kafka来实现
增量快照 Incremental snapshots
特点:
1、从指定的位置开始快照,与初始快照不同,增量快照以块的形式捕获表,而不是一次全部捕获
2、增量和全量同时运行
3、如果增量快照过程被中断,它可以从它停止的点恢复,即断点续传
只读增量快照 read-only Incremental snapshots
特点:
跟增量快照是一样的,只不过由于源库只能读,所以不能使用信号表插入数据来实现全量和增量同时进行,使用kafka来实现
增量快照过程
1、Debezium按主键对每个表进行排序,然后根据配置的块大小将表分成块。一个块一个块地工作,然后捕获一个块中的每个表行。对于它捕获的每一行,快照都会发出一个READ事件。该事件表示chunk的快照开始时的行值。
如何发起增量快照
1、在源库创建一张信号表(signals table)
CREATE TABLE debezium_signal (
id varchar(64),
type varchar(32),
data varchar(2048)
);
2、使用配置table.include.list配置信号表
3、使用配置signal.data.collection配置信号表
4、需要执行增量快照时,往信号表插入相关数据
INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');
例如:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');
源码分析
一、如何读取分块数据
入口在
io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource#readChunk
1、为块生成一个uuid
使用chunid感觉没什么用,就是标记一下当前得chunkid是哪个,如果信号号发一下不是当前chunkid得信息过来,就不做任何处理
2、发起一个开窗事件,相当于标记低水位
这里做的事件主要是往信号表里面插入一条开窗的数据,id就是块id加”-open”后缀,type固定为”snapshot-window-open”:
INSERT INTO `incremental_snapshot-test_1dcllp9`.`debezium_signal`(`id`, `type`, `data`) VALUES ('bad46303-1350-4963-a276-44370a1a06c4-open', 'snapshot-window-open', NULL);
3、从源库读取属于该块的数据
相应的SQL为:
SELECT * FROM `incremental_snapshot-test_1dcllp9`.`a` WHERE (`pk` > 200) AND NOT (`pk` > 1000) ORDER BY `pk` LIMIT 10
200是上一个块的结束位置
1000是这个表最大的数
10是块大小,在配置文件配好了的,配置属性是:incremental.snapshot.chunk.size
接下来就是把这个块的数据读出来放到window这个map里面,并且记录起止位置
设置下一个块的开始读取位置
记录这个表总共读了多少条数据
发起一个关窗事件,相当于标记高水位
这里做的事件主要是往信号表里面插入一条关窗的数据,id就是块id加”-close”后缀,type固定为”snapshot-window-close”:
INSERT INTO `incremental_snapshot-test_1dcllp9`.`debezium_signal`(`id`, `type`, `data`) VALUES ('bad46303-1350-4963-a276-44370a1a06c4-close', 'snapshot-window-close', NULL);
5、在全量读trunk数据的时候,增量也在执行,增量要和trunk合并,但是读trunk也有时间,这段间隙这么处理的
trunk还没读出来的数据,在后面读出来的时候肯定是更新的,所有流数据先发送到下游,后面trunk读出来后再发送也是最新的值,不会出现旧值
6、将窗口的数据发送到下游跟关窗之后的流数据的顺序如何保证
关窗之后的流数据肯定是更新的,将窗口的数据发送到下游和流数据发送到下游都是同一个线程来处理的,所以是同步的,可以保证顺序
7、首次出发全量读chunk的时机
一个是启动debezium配置的init的时候,另外一个就是当debezium接收到新增表的读取信号时
第二个时机的入口在这里:
8、为什么明明是mysql,但是实现还是SignalBasedIncrementalSnapshotChangeEventSource的emitwindowclose方法
只有源库是只读的,并且开启gtid模式,才会使用mysql的实现,否则都是使用信号量的实现
二、如何处理全量和增量的冲突
1、io.debezium.pipeline.EventDispatcher#dispatchDataChangeEvent接收到一个增量事件后,首先判断这条数据是否在当前窗口的全量数据中,如果是则从全量数据里面移除掉这条处理,再将这条增量数据发给下游使用
从全量数据里面移除掉这条处理
三、块里面的全量数据什么时候发给下游
上面发起一个关窗事件,相当于标记高水位说过,全量读完了会往信号表插入一下数据标记全量结束,此刻信号表会产生一个增量事件,而这个事件响应做的事情就是把存储全量的windows发往下游
1、在这里判断是否为信号表产生的增量数据,如果是则调信号处理逻辑
2、在这里获取信号的类型为关窗
3、关窗对应的处理方法为
io.debezium.pipeline.source.snapshot.incremental.CloseIncrementalSnapshotWindow#arrived
这里将window存的全量数据循环发给下游,并清空windows留给下一个块迁移使用,从这里也可以看出debezium的全量迁移即便在1.9版本也是串行的
官网也说了增量快照不是为了快照的并行化
将window的全量数据全部发给下游后再开始下一个块的读取