目前提供Kafka专享版实例的服务,Kafka专享版实例采用物理隔离的方式部署,租户独占Kafka实例。创建Kafka专享版实例之后,使用开源Kafka客户端向Kafka专享版实例生产消息和消费消息。
本章节介绍如何使用开源的Kafka客户端访问未开启SASL的Kafka专享实例方法。
多语言客户端使用请参考Kafka官网:
https://cwiki.apache.org/confluence/display/KAFKA/Clients
说明:Kafka服务器允许客户端单IP连接的个数为200个,如果超过了,会出现连接失败问题。
已配置正确的安全组。
访问未开启SASL的Kafka专享实例时,支持VPC内访问。实例需要配置正确的安全组规则,具体安全组配置要求,请参考下表。
已获取连接Kafka专享版实例的地址。
使用VPC内访问,实例端口为9092,实例连接地址获取如下图。
获取VPC内访问Kafka专享实例的连接地址(实例未开启SASL)
Kafka专享实例已创建Topic,否则请提前创建Topic。
已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本,确保Kafka实例版本与命令行工具版本相同。
已在Kafka命令行工具的使用环境中安装Java Development Kit 1.8.111或以上版本,并完成环境变量配置。
以下操作命令以Linux系统为例进行说明:
解压Kafka命令行工具。
进入文件压缩包所在目录,然后执行以下命令解压文件。
tar -zxf [kafka_tar]
其中,[kafka_tar]表示命令行工具的压缩包名称。
例如:
tar -zxf kafka_2.11-1.1.0.tgz
进入Kafka命令行工具的“/bin”目录下。
注意,Windows系统下需要进入“/bin/windows”目录下。
执行如下命令进行生产消息。
./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称}
参数说明如下:
连接地址:从前提条件中获取的连接地址。
Topic名称:Kafka实例下创建的Topic名称。
以获取的Kafka实例连接地址为“10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092”为例。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]# ./kafka-console-producer.sh --broker-list 10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092 --topic topic-demo
>Hello
>DMS
>Kafka!
>^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
执行如下命令消费消息。
./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning
参数说明如下:
连接地址:从前提条件中获取的连接地址。
Topic名称:Kafka实例下创建的Topic名称。
消费组名称:根据您的业务需求,设定消费组名称。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092 --topic topic-demo --group order-test --from-beginning
Kafka!
DMS
Hello
^CProcessed a total of 3 messages
[root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。