应用场景
在以下场景,使用MirrorMaker进行不同集群间的数据同步,可以确保Kafka集群的可用性和可靠性。
- 备份和容灾:企业存在多个数据中心,为了防止其中一个数据中心出现问题,导致业务不可用,会将集群数据同步备份在多个不同的数据中心。
- 集群迁移:当今很多企业将业务迁移上云,迁移过程中需要确保线下集群和云上集群的数据同步,保证业务的连续性。
方案架构
使用MirrorMaker可以实现将源集群中的数据镜像复制到目标集群中。其原理如图1所示,MirrorMaker本质上也是生产消费消息,首先从源集群中消费数据,然后将消费的数据生产到目标集群。如果您需要了解更多关于MirrorMaker的信息,请参见Mirroring data between clusters。
图 MirrorMaker 原理图
约束与限制
- 源集群中节点的IP地址和端口号不能和目标集群中节点的IP地址和端口号相同,否则会导致数据在Topic内无限循环复制。
- 使用MirrorMaker同步数据,至少需要有两个或以上集群,不可在单个集群内部使用MirrorMaker,否则会导致数据在Topic内无限循环复制。
实施步骤
1、 购买一台弹性云主机,确保弹性云主机与源集群、目标集群网络互通。具体购买操作,请参考购买弹性云主机。
2、 登录弹性云主机,安装Java JDK,并配置JAVA_HOME与PATH环境变量,使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151
export PATH=**JAVA_HOME/bin:**PATH
执行source .bash_profile命令使修改生效。
说明弹性云主机默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本。
3、 下载Kafka 2.4.3及以上版本的二进制软件包。
wget https://downloads.apache.org/kafka/ Kafka版本 /二进制软件包
例如:下载Kafka 2.7.0版本的二进制软件包。
wget https://downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz
4、 解压二进制软件包。
tar -zxvf 二进制软件包
例如:解压“kafka_2.12-2.7.0.tgz”。
tar -zxvf kafka_2.12-2.7.0.tgz
5、 进入二进制软件包目录,修改“config”目录下的“connect-mirror-maker.properties”的配置文件,在配置文件中指定源集群和目标集群的IP地址和端口以及其他配置。
# 指定两个集群
clusters = A, B
A.bootstrap.servers = A_host1:A_port, A_host2:A_port, A_host3:B_port
B.bootstrap.servers = B_host1:B_port, B_host2:B_port, B_host3:B_port
# 指定数据同步方向,可以单向同步也可互相同步
A->B.enabled = true
# 指定同步的Topic,支持正则匹配,默认复制全部Topic,如:"foo-.*"
A->B.topics = .*
# 打开以下两个配置则表示A、B两个集群互相复制同步
#B->A.enabled = true
#B->A.topics = .*
# 设置副本个数,如果是要同步多个Topic且副本数各不相同,建议先创建同名同本数的Topic再启动MirrorMaker
replication.factor=3
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# 测试环境可以为1,生产环境建议以下配置大于1,比如设为3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
6、 在二进制软件包目录下,启动MirrorMaker,进行数据同步。
./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
7、 (可选)MirrorMaker开启后,如果在源集群上新建了Topic,如需对此Topic进行数据同步,则需重启MirrorMaker,重启步骤参考6。也可配置自动同步新增Topic,按需增加如表1所示配置后,无需重启MirrorMaker,即可周期性同步新增Topic。其中,“refresh.topics.interval.seconds”为必选,其他参数根据实际情况选择。
表 MirrorMaker配置参数
参数名 | 默认值 | 说明 |
---|---|---|
sync.topic.configs.enabled | true | 是否监控源集群的配置更改 |
sync.topic.acls.enabled | true | 是否监控源集群ACL的更改 |
emit.heartbeats.enabled | true | 连接器应定期发出心跳 |
emit.heartbeats.interval.seconds | 5秒 | 心跳频率 |
emit.checkpoints.enabled | true | 连接器应定期发出消费端偏移量信息 |
emit.checkpoints.interval.seconds | 5秒 | 检查点的频率 |
refresh.topics.enabled | true | 连接器应定期检查新主题 |
refresh.topics.interval.seconds | 5秒 | 检查源群集中是否有新主题的频率 |
refresh.groups.enabled | true | 连接器应定期检查新的消费组 |
refresh.groups.interval.seconds | 5秒 | 检查源集群新的消费组频率 |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | 使用LegacyReplicationPolicy模仿旧版MirrorMaker |
heartbeats.topic.retention.ms | 1天 | 首次创建心跳主题时使用 |
checkpoints.topic.retention.ms | 1天 | 首次创建检查点主题时使用 |
offset.syncs.topic.retention.ms | max long | 首次创建偏移同步主题时使用 |
验证数据是否同步
1、 在目标集群中查看Topic列表,确认是否有源集群Topic。
说明目标集群中的Topic名称和源集群相比,多了前缀(如A.),这属于正常情况,是MirrorMaker 2为了防止Topic循环备份进行的设置。
2、 在源集群生产并消费消息,在目标集群查看消费进度,确认数据是否已从源集群同步到了目标集群。
如果目标集群为Kafka实例的话,在分布式消息服务Kafka控制台的“消费组管理 > 消费进度”中,查看消费进度。