本章节介绍如何创建Kafka数据复制的Smart Connect任务,通过Smart Connect任务可以在两个Kafka实例之间,实现数据的单向或双向复制。
源Kafka实例中的数据会实时同步到目标Kafka实例中。
约束与限制
- 一个实例最多创建18个Smart Connect任务。
- 使用Kafka数据复制时,两个Kafka实例间只能通过内网连接。如果两个Kafka实例处于不同的VPC中,请先打通网络。
- Smart Connect任务创建成功后,不支持修改任务参数。
- 确保目标Kafka实例Topic的“批处理消息最大值”大于等于524288字节,否则会导致数据无法同步。如果目标Kafka实例没有创建Topic,在数据同步时会自动创建Topic,此Topic的“批处理消息最大值”和源Kafka实例Topic相同,此时需要确保源Kafka实例Topic的“批处理消息最大值”大于等于524288字节。修改“批处理消息最大值”的方法请参考修改Kafka Topic配置。
前提条件
- 已开启Smart Connect。
- 已创建Kafka实例,且实例状态为“运行中”。
配置Kafka间的数据复制
1、登录管理控制台。
2、在管理控制台左上角单击,选择Kafka实例所在的区域。
3、在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入分布式消息服务Kafka专享版页面。
4、单击Kafka实例名称,进入实例详情页面。
5、在左侧导航栏单击“Smart Connect”,进入Smart Connect任务列表页面。
6、单击“创建Smart Connect任务”,进入“创建smart connect”页面。
7、在“connect任务名称”中,输入Smart Connect任务的名称,用于区分不同的Smart Connect任务。任务名称需要符合命名规则:长度为4~64个字符,只能由英文字母、数字、中划线、下划线组成。
8、在“预置类型”中,选择“Kafka数据复制”。
9、在“当前kafka”区域,设置实例别名。实例别名需要符合命名规则:长度为1~20个字符,只能由英文字母、数字、中划线、下划线组成。
实例别名用于以下两个场景中:
- 开启“重命名Topic”,且“同步方式”为“推送”/“双向”时,当前Kafka实例的别名作为前缀添加到对端Kafka实例的Topic名称前,形成Topic新的名称。例如当前Kafka实例别名为A,对端Kafka实例的Topic名称为test,重命名后的Topic为A.test。
- Kafka数据复制的Smart Connect任务创建成功后,当前Kafka实例会自动创建“mm2-offset-syncs. 对端Kafka实例别名 .internal”的Topic。如果Smart Connect任务开启了“同步消费进度”功能,且“同步方式”为“拉取”/“双向”时,当前Kafka实例还会自动创建“ 对端Kafka实例别名 .checkpoints.internal”的Topic。这两个Topic用于存储内部数据,如果删除,会导致同步数据失败。
10、在“对端kafka”区域,设置以下参数。
表1-1 对端Kafka实例参数说明
参数 参数说明 实例别名 设置实例别名,实例别名需要符合命名规则:长度为1~20个字符,只能由英文字母、数字、中划线、下划线组成。
实例别名用于以下两个场景中:
开启“重命名Topic”,且“同步方式”为“拉取”/“双向”时,对端Kafka实例的别名作为前缀添加到当前Kafka实例的Topic名称前,形成Topic新的名称。例如对端Kafka实例别名为B,当前Kafka实例的Topic名称为test01,重命名后的Topic为B.test01。
Kafka数据复制的Smart Connect任务创建成功后,如果Smart Connect任务开启了“同步消费进度”功能,且“同步方式”为“推送”/“双向”时,对端Kafka实例会自动创建“当前Kafka实例别名.checkpoints.internal”的Topic。此Topic用于存储内部数据,如果删除,会导致同步数据失败。
配置类型 支持以下两种配置类型:
Kafka地址:输入Kafka实例的连接信息。对端Kafka实例和当前Kafka实例处于不同的VPC下时,请选择此配置类型。
实例名称:选择已创建的Kafka实例。对端Kafka实例和当前Kafka实例处于相同的VPC下时,建议选择此配置类型。
实例名称 “配置类型”为“实例名称”,且对端Kafka实例和当前Kafka实例处于相同的VPC下时,需要设置。
在下拉列表中,选择已创建的Kafka实例。
Kafka地址 “配置类型”为“Kafka地址”时,需要设置。 输入Kafka实例的连接地址和端口号。
使用Kafka数据复制时,两个Kafka实例间只能通过内网连接。如果两个Kafka实例处于不同的VPC中,请先打通网络。
认证方式 支持以下认证方式:
SASL_SSL:表示实例已开启SASL_SSL认证,客户端连接Kafka实例时采用SASL认证,数据通过SSL证书进行加密传输。
SASL_PLAINTEXT:表示实例开启SASL_PLAINTEXT认证,客户端连接Kafka实例时采用SASL认证,数据通过明文传输。
PLAINTEXT:表示实例未开启认证。
认证机制 “认证方式”为“SASL_SSL”/“SASL_PLAINTEXT”时,需要设置。
SCRAM-SHA-512:采用哈希算法对用户名与密码生成凭证,进行身份校验的安全认证机制,比PLAIN机制安全性更高。
PLAIN:一种简单的用户名密码校验机制。
用户名 “认证方式”为“SASL_SSL”/“SASL_PLAINTEXT”时,需要设置。
首次开启密文接入时设置的用户名,或者创建用户时设置的用户名。
密码 “认证方式”为“SASL_SSL”/“SASL_PLAINTEXT”时,需要设置。
首次开启密文接入时设置的密码,或者创建用户时设置的密码。
Smart Connect任务创建成功后,如果您修改了对端实例的认证方式、认证机制或者密码,会导致同步任务失败。 您需要删除当前Smart Connect任务,然后重新创建新的Smart Connect任务。
11、在“规则配置”区域,设置以下参数。
表1-2 复制数据规则参数说明
参数 参数说明 同步方式 支持以下三种同步方式:
拉取:把对端Kafka实例数据复制到当前Kafka实例中。
推送:把当前Kafka实例数据复制到对端Kafka实例中。
双向:两端Kafka实例数据进行双向复制。
Topics 设置需要进行数据复制的Topic。
正则表达式:通过正则表达式匹配Topic。
输入/选择:输入Topic名称,如果需要输入多个Topic名称,先输入一个Topic名称,按“Enter”,然后再输入下一个,按“Enter”,依次输入。或者在下拉列表中,选择Topic。最多输入/选择20个Topic。
Topic名称以internal结尾时(例如:topic.internal),此Topic的数据不会被同步。
任务数 数据复制的任务数。默认值为2,建议保持默认值。 如果“同步方式”为“双向”,实际任务数=设置的任务数*2。
重命名
Topic
在目标Topic名称前添加源端Kafka实例的别名,形成目标Topic新的名称。例如源端实例别名为A,目标Topic名称为test,重命名后的目标Topic为A.test。 两端实例数据双向复制时,开启“重命名Topic”,防止循环复制。
添加来源header 目标Topic接收复制的消息,此消息header中包含消息来源。 两端实例数据双向复制时,默认开启“添加来源header”,防止循环复制。
同步消费进度 开启“同步消费进度”后,将消费者消费进度同步到目标Kafka实例。
开启“同步消费进度”后,您需要注意以下几点:
源端Kafka实例和目标端Kafka实例不能同时消费,否则会导致同步的消费进度异常。
同步消费进度的频率为每分钟一次,因此会导致目标端的消费进度可能会略小于源端,造成部分消息被重复消费,所以需要消费者客户端业务逻辑兼容重复消费的场景。
从源端同步的offset与目标端的offset并非一致关系,而是映射关系,如果消费进度由消费者客户端自行维护,消费者客户端从消费源端Kafka实例变为消费目标端Kafka实例后,不向目标端Kafka实例获取消费进度,可能会导致offset错误或消费进度重置。
副本数 在对端实例中自动创建Topic时,指定Topic的副本数,此参数值不能超过对端实例的代理数。
如果对端实例中设置了“default.replication.factor”,此参数的优先级高于“default.replication.factor”。
启动偏移量 支持两种偏移量:
最早:最小偏移量,即获取最早的数据。
最新:最大偏移量,即获取最新的数据。
压缩算法 复制消息所使用的压缩算法。
topic映射 通过Topic映射,您可以自定义目标端Topic名称。
最多新增20个Topic映射。不能同时设置“重命名Topic”和“topic映射”。
配置复制数据规则时需要注意以下几点:
- 创建双向数据复制任务时,为了防止循环复制,控制台限定必须开启“重命名Topic”或者“添加来源header”。如果您在两个实例间,对同一个Topic分别创建拉取和推送的任务,即形成双向数据复制,且两个任务都未开启“重命名Topic”和“添加来源header”,此时会导致数据循环复制。
- 如果创建两个或以上配置完全相同的任务,即重复创建任务,且任务已开启“同步消费进度”,此时会导致数据重复复制,且目标Topic消费进度异常。
12、(可选)在页面右下角单击“开始检测”,测试两端Kafka实例的连通性。
显示“连通性检测成功”时,表示两端Kafka实例可以正常连接。
13、单击“立即创建”,跳转到Smart Connect任务列表页面,页面右上角显示“创建xxx任务成功”。
Kafka数据复制的Smart Connect任务创建成功后,Kafka会自动创建以下Topic。
- 当前Kafka实例会自动创建“mm2-offset-syncs. 对端Kafka实例别名 .internal”的Topic。如果Smart Connect任务开启了“同步消费进度”功能,且“同步方式”为“拉取”/“双向”时,当前Kafka实例还会自动创建“ 对端Kafka实例别名 .checkpoints.internal”的Topic。这两个Topic用于存储内部数据,如果删除,会导致同步数据失败。
- 如果Smart Connect任务开启了“同步消费进度”功能,“同步方式”为“推送”/“双向”时,对端Kafka实例会自动创建“ 当前Kafka实例别名 .checkpoints.internal”的Topic。此Topic用于存储内部数据,如果删除,会导致同步数据失败。
----结束