Kafka跨集群同步是一个常见的需求,比如以下场景,使用MirrorMaker进行不同集群间的数据同步,可以确保Kafka集群的可用性和可靠性。
(1)企业存在多个数据中心,为了防止其中一个数据中心出现问题,导致业务不可用,会将集群数据同步备份在多个不同的数据中心。
(2)当今很多企业将业务迁移上云,迁移过程中需要确保线下集群和云上集群的数据同步,保证业务的连续性。
方案架构
MirrorMarker是Kafka官方仓库提供的用于Kafka各集群间消息同步的工具,使用MirrorMaker可以实现将源集群中的数据镜像复制到目标集群中。MirrorMaker本质上也是生产消费消息,首先从源集群中消费数据,然后将消费的数据生产到目标集群。
其原理如下图所示:
主要功能
同步Topic数据到目标集群,并保持分区信息
同步Topic配置到目标集群,与源集群保持一致
同步Topic ACL到目标集群,与源集群保持一致(没有WRITE权限)
自动感知新的Topic和分区并同步到目标集群
同步组Offset到目标集群
基于Connect实现,特性:高可用、水平扩展
指定Topic、Group同步规则
自定义目标集群Topic名称规则
支持模式
双活:A->B, B->A
主备:A->B
聚合(多对一):A->K, B->K, C->K
分发(一对多):K->A, K->B, K->C
转发:A->B, B->C, C->D
使用MirrorMarker
(1)启动两套 Kafka 集群
单节点的伪集群,监听端口分别是 9092 和 9093。
(2)配置MirrorMarker
进入kafka安装目录,修改“config/connect-mirror-maker.properties”配置文件,在配置文件中指定源集群和目标集群的IP地址和端口以及其他配置。
# 指定两个集群
clusters = A, B
A.bootstrap.servers = A_host1:A_port, A_host2:A_port, A_host3:B_port
B.bootstrap.servers = B_host1:B_port, B_host2:B_port, B_host3:B_port
# 指定数据同步方向,可以单向同步也可互相同步
A->B.enabled = true
# 指定同步的Topic,支持正则匹配,默认复制全部Topic,如:"foo-.*"
A->B.topics = .*
# 打开以下两个配置则表示A、B两个集群互相复制同步
#B->A.enabled = true
#B->A.topics = .*
# 设置副本个数,如果是要同步多个Topic且副本数各不相同,建议先创建同名同本数的Topic再启动MirrorMaker
replication.factor=3
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
(3)启动MirrorMarker
在kafka安装目录下,启动MirrorMaker,进行数据同步。
./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
(4)同步验证
假设在源集群上创建了一个 4 分区的主题 test,随后使用 kafka-producer-perf-test 脚本模拟发送了 500 万条消息。现在,使用下面两条命令来查询,目标 Kafka 集群上是否存在名为 test 的主题,并且成功地镜像了这些消息。
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -2
test:0:0
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -1
test:0:5000000
-1 和 -2 分别表示获取某分区最新的位移和最早的位移,这两个位移值的差值就是这个分区当前的消息数。在这个例子中,差值是 500 万条。这说明主题 test 当前共写入了 500 万条消息。换句话说,MirrorMaker 已经成功地把这 500 万条消息同步到了目标集群上。
实现分析
主要有三个子任务,同步数据任务、同步Offset任务、心跳任务。
(1)同步数据任务
多实例,按topic partition分配任务,同步topic partition数据,并产生上下游offset对应关系
MirrorSourceConnector 初始化工作,同步创建Topic以及配置、ACL,分配任务信息
start(config):
创建OffsetSyncsTopic,单分区、Compact(在源集群创建) "mm2-offset-syncs." + targetClusterAlias() + ".internal"
加载源集群、目标集群Topic Partiton信息,Config: topics、topics.exclude
创建目标集群Topic,如果不存在就创建,存在保持分区数一致(目标集群Topic分区数不少于源集群)
刷新目标集群Topic Partiton信息
同步Topic ACL到目标集群(定时任务,默认间隔10min),这里不同步对Topic的WRITE权限,也就是目标集群Topic一般用户不具备写权限只有MM可以写入(这里考虑切换的时候需要关注)
同步Topic配置到目标集群(定时任务,默认间隔10min),配置过滤,Config: config.properties.exclude
刷新源集群Topic Partiton信息并同步到目标集群(定时任务,默认间隔10min),如果分区信息发生变化请求重新进行任务配置requestTaskReconfiguration()
taskConfigs(maxTasks):
connect框架方法,分配任务配置,按task数轮训分配topic partiton任务,每个任务消费被分配到的topic partiton进行数据同步
分配每一个任务的配置信息(按分区分配任务)
MirrorSourceTask 同步数据任务,产生offset映射关系
poll():
拉取源集群Topic数据
转换目标topic名称返回数据
数据由connect框架写入目标topic
commitRecord(SourceRecord record, RecordMetadata metadata):
记录Topic partition上下游offset映射关系,写入topicPartition -> upstreamOffset, downstreamOffset映射关系到OffsetSyncsTopic里面
(2)同步Offset任务
多实例,按group分配任务,产生group + topic partition -> upstreamOffset, downstreamOffset的checkpoint数据,并提交下游group的offset
MirrorCheckpointConnector
start(config):
创建CheckpointsTopic,单分区、Compact(在目标集群创建) sourceClusterAlias() + ".checkpoints.internal"
加载所有需要同步的组,Config: groups、groups.exclude
刷新所有需要同步的组,(定时任务,默认间隔10min),如果组信息发生变化请求重新进行任务配置requestTaskReconfiguration()
taskConfigs(maxTasks):
connect框架方法,分配任务配置,按task数进行group分配,每个任务同步被分配到的group进行offset同步
分配每一个任务的配置信息(按group分配任务)
MirrorCheckpointTask
start(config):
刷新需要同步的group在目标集群为空闲状态的group信息以及提交的offset信息(定时任务,默认间隔60s)
同步group offset到目标集群(定时任务,默认间隔60s),将最新的checkpoint数据提交到目标集群空闲的group中
poll():
connect框架方法
checkpoint默认间隔时间60s
不断消费OffsetSyncsTopic,直到发送checkpoint时间为止,这个topic保存的是topicPartition -> upstreamOffset, downstreamOffset映射关系
查询分配到的group的提交offset信息,生产checkpoint数据Checkpoint(group, targetTopicPartition, upstreamOffset, downstreamOffset, metadata)
downstreamOffset计算方法(核心逻辑) long upstreamStep = upstreamOffset - offsetSync.upstreamOffset(); downstreamOffset = offsetSync.downstreamOffset() + upstreamStep;
checkpoint数据由connect框架写入目标topic
(3)心跳任务
单实例,产生心跳数据
MirrorHeartbeatConnector
创建HeartbeatsTopic,单分区、Compact(在目标集群创建) heartbeats
分配任务配置,只有一个任务
MirrorHeartbeatTask
poll():connect框架方法
默认每秒发送一条心跳数据,Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp)