searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kafka跨集群同步方案MirrorMaker

2023-06-28 01:50:22
173
0

Kafka跨集群同步是一个常见的需求,比如以下场景,使用MirrorMaker进行不同集群间的数据同步,可以确保Kafka集群的可用性和可靠性。

(1)企业存在多个数据中心,为了防止其中一个数据中心出现问题,导致业务不可用,会将集群数据同步备份在多个不同的数据中心。

(2)当今很多企业将业务迁移上云,迁移过程中需要确保线下集群和云上集群的数据同步,保证业务的连续性。

 

方案架构

MirrorMarker是Kafka官方仓库提供的用于Kafka各集群间消息同步的工具,使用MirrorMaker可以实现将源集群中的数据镜像复制到目标集群中。MirrorMaker本质上也是生产消费消息,首先从源集群中消费数据,然后将消费的数据生产到目标集群。

其原理如下图所示:

主要功能

同步Topic数据到目标集群,并保持分区信息

同步Topic配置到目标集群,与源集群保持一致

同步Topic ACL到目标集群,与源集群保持一致(没有WRITE权限)

自动感知新的Topic和分区并同步到目标集群

同步组Offset到目标集群

基于Connect实现,特性:高可用、水平扩展

指定Topic、Group同步规则

自定义目标集群Topic名称规则

 

支持模式

双活:A->B, B->A

主备:A->B

聚合(多对一):A->K, B->K, C->K

分发(一对多):K->A, K->B, K->C

转发:A->B, B->C, C->D

 

使用MirrorMarker

(1)启动两套 Kafka 集群

单节点的伪集群,监听端口分别是 9092 和 9093。

 

(2)配置MirrorMarker

进入kafka安装目录,修改“config/connect-mirror-maker.properties”配置文件,在配置文件中指定源集群和目标集群的IP地址和端口以及其他配置。

# 指定两个集群
clusters = A, B
A.bootstrap.servers = A_host1:A_port, A_host2:A_port, A_host3:B_port
B.bootstrap.servers = B_host1:B_port, B_host2:B_port, B_host3:B_port

# 指定数据同步方向,可以单向同步也可互相同步
A->B.enabled = true

# 指定同步的Topic,支持正则匹配,默认复制全部Topic,如:"foo-.*"
A->B.topics = .*

# 打开以下两个配置则表示A、B两个集群互相复制同步
#B->A.enabled = true
#B->A.topics = .*

# 设置副本个数,如果是要同步多个Topic且副本数各不相同,建议先创建同名同本数的Topic再启动MirrorMaker
replication.factor=3

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

 

(3)启动MirrorMarker

在kafka安装目录下,启动MirrorMaker,进行数据同步。

./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties

 

(4)同步验证

假设在源集群上创建了一个 4 分区的主题 test,随后使用 kafka-producer-perf-test 脚本模拟发送了 500 万条消息。现在,使用下面两条命令来查询,目标 Kafka 集群上是否存在名为 test 的主题,并且成功地镜像了这些消息。

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -2
test:0:0

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -1
test:0:5000000

-1 和 -2 分别表示获取某分区最新的位移和最早的位移,这两个位移值的差值就是这个分区当前的消息数。在这个例子中,差值是 500 万条。这说明主题 test 当前共写入了 500 万条消息。换句话说,MirrorMaker 已经成功地把这 500 万条消息同步到了目标集群上。

 

实现分析

主要有三个子任务,同步数据任务、同步Offset任务、心跳任务。

 

(1)同步数据任务

多实例,按topic partition分配任务,同步topic partition数据,并产生上下游offset对应关系

 

MirrorSourceConnector 初始化工作,同步创建Topic以及配置、ACL,分配任务信息

 

start(config):

创建OffsetSyncsTopic,单分区、Compact(在源集群创建) "mm2-offset-syncs." + targetClusterAlias() + ".internal"

加载源集群、目标集群Topic Partiton信息,Config: topics、topics.exclude

创建目标集群Topic,如果不存在就创建,存在保持分区数一致(目标集群Topic分区数不少于源集群)

刷新目标集群Topic Partiton信息

同步Topic ACL到目标集群(定时任务,默认间隔10min),这里不同步对Topic的WRITE权限,也就是目标集群Topic一般用户不具备写权限只有MM可以写入(这里考虑切换的时候需要关注)

同步Topic配置到目标集群(定时任务,默认间隔10min),配置过滤,Config: config.properties.exclude

刷新源集群Topic Partiton信息并同步到目标集群(定时任务,默认间隔10min),如果分区信息发生变化请求重新进行任务配置requestTaskReconfiguration()

 

taskConfigs(maxTasks):

connect框架方法,分配任务配置,按task数轮训分配topic partiton任务,每个任务消费被分配到的topic partiton进行数据同步

分配每一个任务的配置信息(按分区分配任务)

 

MirrorSourceTask 同步数据任务,产生offset映射关系

poll():

拉取源集群Topic数据

转换目标topic名称返回数据

数据由connect框架写入目标topic

commitRecord(SourceRecord record, RecordMetadata metadata):

记录Topic partition上下游offset映射关系,写入topicPartition -> upstreamOffset, downstreamOffset映射关系到OffsetSyncsTopic里面

 

(2)同步Offset任务

多实例,按group分配任务,产生group + topic partition -> upstreamOffset, downstreamOffset的checkpoint数据,并提交下游group的offset

 

MirrorCheckpointConnector

start(config):

创建CheckpointsTopic,单分区、Compact(在目标集群创建) sourceClusterAlias() + ".checkpoints.internal"

加载所有需要同步的组,Config: groups、groups.exclude

刷新所有需要同步的组,(定时任务,默认间隔10min),如果组信息发生变化请求重新进行任务配置requestTaskReconfiguration()

taskConfigs(maxTasks):

connect框架方法,分配任务配置,按task数进行group分配,每个任务同步被分配到的group进行offset同步

分配每一个任务的配置信息(按group分配任务)

 

MirrorCheckpointTask

start(config):

刷新需要同步的group在目标集群为空闲状态的group信息以及提交的offset信息(定时任务,默认间隔60s)

同步group offset到目标集群(定时任务,默认间隔60s),将最新的checkpoint数据提交到目标集群空闲的group中

poll():

connect框架方法

checkpoint默认间隔时间60s

不断消费OffsetSyncsTopic,直到发送checkpoint时间为止,这个topic保存的是topicPartition -> upstreamOffset, downstreamOffset映射关系

查询分配到的group的提交offset信息,生产checkpoint数据Checkpoint(group, targetTopicPartition, upstreamOffset, downstreamOffset, metadata)

downstreamOffset计算方法(核心逻辑) long upstreamStep = upstreamOffset - offsetSync.upstreamOffset(); downstreamOffset = offsetSync.downstreamOffset() + upstreamStep;

checkpoint数据由connect框架写入目标topic

 

(3)心跳任务

单实例,产生心跳数据

 

MirrorHeartbeatConnector

创建HeartbeatsTopic,单分区、Compact(在目标集群创建) heartbeats

分配任务配置,只有一个任务

 

MirrorHeartbeatTask

poll():connect框架方法

默认每秒发送一条心跳数据,Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp)

 

0条评论
作者已关闭评论
a****k
16文章数
0粉丝数
a****k
16 文章 | 0 粉丝
原创

Kafka跨集群同步方案MirrorMaker

2023-06-28 01:50:22
173
0

Kafka跨集群同步是一个常见的需求,比如以下场景,使用MirrorMaker进行不同集群间的数据同步,可以确保Kafka集群的可用性和可靠性。

(1)企业存在多个数据中心,为了防止其中一个数据中心出现问题,导致业务不可用,会将集群数据同步备份在多个不同的数据中心。

(2)当今很多企业将业务迁移上云,迁移过程中需要确保线下集群和云上集群的数据同步,保证业务的连续性。

 

方案架构

MirrorMarker是Kafka官方仓库提供的用于Kafka各集群间消息同步的工具,使用MirrorMaker可以实现将源集群中的数据镜像复制到目标集群中。MirrorMaker本质上也是生产消费消息,首先从源集群中消费数据,然后将消费的数据生产到目标集群。

其原理如下图所示:

主要功能

同步Topic数据到目标集群,并保持分区信息

同步Topic配置到目标集群,与源集群保持一致

同步Topic ACL到目标集群,与源集群保持一致(没有WRITE权限)

自动感知新的Topic和分区并同步到目标集群

同步组Offset到目标集群

基于Connect实现,特性:高可用、水平扩展

指定Topic、Group同步规则

自定义目标集群Topic名称规则

 

支持模式

双活:A->B, B->A

主备:A->B

聚合(多对一):A->K, B->K, C->K

分发(一对多):K->A, K->B, K->C

转发:A->B, B->C, C->D

 

使用MirrorMarker

(1)启动两套 Kafka 集群

单节点的伪集群,监听端口分别是 9092 和 9093。

 

(2)配置MirrorMarker

进入kafka安装目录,修改“config/connect-mirror-maker.properties”配置文件,在配置文件中指定源集群和目标集群的IP地址和端口以及其他配置。

# 指定两个集群
clusters = A, B
A.bootstrap.servers = A_host1:A_port, A_host2:A_port, A_host3:B_port
B.bootstrap.servers = B_host1:B_port, B_host2:B_port, B_host3:B_port

# 指定数据同步方向,可以单向同步也可互相同步
A->B.enabled = true

# 指定同步的Topic,支持正则匹配,默认复制全部Topic,如:"foo-.*"
A->B.topics = .*

# 打开以下两个配置则表示A、B两个集群互相复制同步
#B->A.enabled = true
#B->A.topics = .*

# 设置副本个数,如果是要同步多个Topic且副本数各不相同,建议先创建同名同本数的Topic再启动MirrorMaker
replication.factor=3

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

 

(3)启动MirrorMarker

在kafka安装目录下,启动MirrorMaker,进行数据同步。

./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties

 

(4)同步验证

假设在源集群上创建了一个 4 分区的主题 test,随后使用 kafka-producer-perf-test 脚本模拟发送了 500 万条消息。现在,使用下面两条命令来查询,目标 Kafka 集群上是否存在名为 test 的主题,并且成功地镜像了这些消息。

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -2
test:0:0

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -1
test:0:5000000

-1 和 -2 分别表示获取某分区最新的位移和最早的位移,这两个位移值的差值就是这个分区当前的消息数。在这个例子中,差值是 500 万条。这说明主题 test 当前共写入了 500 万条消息。换句话说,MirrorMaker 已经成功地把这 500 万条消息同步到了目标集群上。

 

实现分析

主要有三个子任务,同步数据任务、同步Offset任务、心跳任务。

 

(1)同步数据任务

多实例,按topic partition分配任务,同步topic partition数据,并产生上下游offset对应关系

 

MirrorSourceConnector 初始化工作,同步创建Topic以及配置、ACL,分配任务信息

 

start(config):

创建OffsetSyncsTopic,单分区、Compact(在源集群创建) "mm2-offset-syncs." + targetClusterAlias() + ".internal"

加载源集群、目标集群Topic Partiton信息,Config: topics、topics.exclude

创建目标集群Topic,如果不存在就创建,存在保持分区数一致(目标集群Topic分区数不少于源集群)

刷新目标集群Topic Partiton信息

同步Topic ACL到目标集群(定时任务,默认间隔10min),这里不同步对Topic的WRITE权限,也就是目标集群Topic一般用户不具备写权限只有MM可以写入(这里考虑切换的时候需要关注)

同步Topic配置到目标集群(定时任务,默认间隔10min),配置过滤,Config: config.properties.exclude

刷新源集群Topic Partiton信息并同步到目标集群(定时任务,默认间隔10min),如果分区信息发生变化请求重新进行任务配置requestTaskReconfiguration()

 

taskConfigs(maxTasks):

connect框架方法,分配任务配置,按task数轮训分配topic partiton任务,每个任务消费被分配到的topic partiton进行数据同步

分配每一个任务的配置信息(按分区分配任务)

 

MirrorSourceTask 同步数据任务,产生offset映射关系

poll():

拉取源集群Topic数据

转换目标topic名称返回数据

数据由connect框架写入目标topic

commitRecord(SourceRecord record, RecordMetadata metadata):

记录Topic partition上下游offset映射关系,写入topicPartition -> upstreamOffset, downstreamOffset映射关系到OffsetSyncsTopic里面

 

(2)同步Offset任务

多实例,按group分配任务,产生group + topic partition -> upstreamOffset, downstreamOffset的checkpoint数据,并提交下游group的offset

 

MirrorCheckpointConnector

start(config):

创建CheckpointsTopic,单分区、Compact(在目标集群创建) sourceClusterAlias() + ".checkpoints.internal"

加载所有需要同步的组,Config: groups、groups.exclude

刷新所有需要同步的组,(定时任务,默认间隔10min),如果组信息发生变化请求重新进行任务配置requestTaskReconfiguration()

taskConfigs(maxTasks):

connect框架方法,分配任务配置,按task数进行group分配,每个任务同步被分配到的group进行offset同步

分配每一个任务的配置信息(按group分配任务)

 

MirrorCheckpointTask

start(config):

刷新需要同步的group在目标集群为空闲状态的group信息以及提交的offset信息(定时任务,默认间隔60s)

同步group offset到目标集群(定时任务,默认间隔60s),将最新的checkpoint数据提交到目标集群空闲的group中

poll():

connect框架方法

checkpoint默认间隔时间60s

不断消费OffsetSyncsTopic,直到发送checkpoint时间为止,这个topic保存的是topicPartition -> upstreamOffset, downstreamOffset映射关系

查询分配到的group的提交offset信息,生产checkpoint数据Checkpoint(group, targetTopicPartition, upstreamOffset, downstreamOffset, metadata)

downstreamOffset计算方法(核心逻辑) long upstreamStep = upstreamOffset - offsetSync.upstreamOffset(); downstreamOffset = offsetSync.downstreamOffset() + upstreamStep;

checkpoint数据由connect框架写入目标topic

 

(3)心跳任务

单实例,产生心跳数据

 

MirrorHeartbeatConnector

创建HeartbeatsTopic,单分区、Compact(在目标集群创建) heartbeats

分配任务配置,只有一个任务

 

MirrorHeartbeatTask

poll():connect框架方法

默认每秒发送一条心跳数据,Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp)

 

文章来自个人专栏
云组件
16 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0