Topic,即消息主题。创建Kafka实例成功后,如果没有开启“Kafka自动创建Topic”,需要手动创建Topic,然后才能进行生产消息和消费消息。如果实例开启了“Kafka自动创建Topic”,则该操作为可选。
“Kafka自动创建Topic”表示在生产或消费一个未创建的Topic时,会自动创建此Topic,此Topic的默认参数值如下:分区数为3,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。如果在“配置参数”中修改“log.retention.hours”、“default.replication.factor”或“num.partitions”的参数值,此后自动创建的Topic参数值为修改后的参数值。例如:“num.partitions”修改为“5”,自动创建的Topic参数值如下:分区数为5,副本数为3,老化时间为72小时,不开启同步复制和同步落盘。
Kafka实例对Topic的总分区数设置了上限, 当Topic的总分区数达到上限后,用户就无法继续创建Topic 。不同规格配置的Topic总分区数不同,具体请参考Kafka产品规格说明。
本文主要介绍手动创建Topic的操作,有以下几种方式,您可以根据实际情况选择任意一种方式:
- 方式1:在控制台创建
- 方式2:在Kafka Manager创建
- 方式3:在Kafka客户端上创建
说明实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
方式1:在控制台创建
步骤 1 登录管理控制台。
步骤 2 在管理控制台右上角单击,选择区域。
说明请选择Kafka实例所在的区域。
步骤 3 在管理控制台左上角单击,选择“企业中间件”-“分布式消息服务”-“Kafka专享版”,进入分布式消息服务Kafka专享版页面。
步骤 4 单击Kafka实例的名称,进入实例详情页面。
步骤 5 选择“Topic管理”页签,单击“创建Topic”。
弹出“创建Topic”对话框。
图 创建Topic
步骤 6 填写Topic名称和配置信息。
表 Topic参数说明
参考 说明 Topic名称 系统为您自动生成了Topic名称,您可以根据需要修改。
创建Topic后不能修改名称。
分区数 您可以设置Topic的分区数,分区数越大消费的并发度越大。
该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。
取值范围:1~200
默认值:3
副本数 您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。
该参数设置为1时,表示只有一份数据。
默认值:3
说明实例节点出现故障的情况下,单副本Topic查询消息时可能会报“内部服务错误”,因此不建议使用单副本Topic。
老化时间(小时) 消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。
取值范围:1~720
默认值:72
同步复制 指后端收到生产消息请求并复制给所有副本后,才返回客户端。
开启同步复制后,需要在客户端配置acks=all或者-1,否则无效。
当副本数为1时,不能选择同步复制功能。
同步落盘 同步落盘是指生产的每条消息都会立即写入磁盘。
开启:生产的每条消息都会立即写入磁盘,可靠性更高。
关闭:生产的消息存在内存中,不会立即写入磁盘。
消息时间戳类型 定义消息中的时间戳类型,取值如下:
CreateTime:生产者创建消息的时间。
LogAppendTime:broker将消息写入日志的时间。
批处理消息最大值 Kafka允许的最大批处理大小,如果启用消息压缩,则表示压缩后的最大批处理大小。
如果增加“max.message.bytes”的值,且存在消费者版本早于0.10.2,此时消费者的“fetch size”值也必须增加,以便消费者可以获取增加后的批处理大小。
描述 Topic的描述信息。
步骤 7 配置完成后,单击“确定”,完成创建Topic。
方式2:在Kafka Manager创建
登录Kafka Manager后,在页面顶部选择“Topic > Create”,然后按照界面参数填写即可。
图 在Kafka Manager中创建Topic
说明Topic名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。
方式3:在Kafka客户端上创建
Kafka客户端版本为2.2以上时,支持通过kafka-topics.sh创建Topic,以及管理Topic的各类参数。
说明Topic名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。
- 未开启SASL的Kafka实例,在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}
- 已开启SASL的Kafka实例,通过以下步骤创建Topic。
- (可选)如果已经设置了SSL证书配置,请跳过此步骤。否则请执行以下操作。在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考步骤3增加SSL证书配置。
- 在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令创建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --command-config ./config/ssl-user-config.properties