本文主要介绍在命令行模式下使用SASL证书连接Kafka实例的操作,其中包含内网访问和公网访问两种连接场景。
在使用SASL证书的场景下,通过内网访问和通过公网访问,仅涉及连接IP和端口不一致,其他操作步骤是一样的,内网访问的连接端口为9093,公网访问的连接端口为9095。
文中仅介绍公网访问的连接示例,如果使用内网访问时,替换为相应的连接地址即可。
说明Kafka实例的每个代理允许客户端单IP连接的个数默认为1000个,如果超过了,会出现连接失败问题。您可以通过修改配置参数来修改单IP的连接数。
前提条件
1.已配置正确的安全组,安全组规则请参考准备环境中的安全组规则 。
2.已获取连接Kafka实例的地址。
- 如果是使用内网通过同一个VPC访问,实例端口为9093,实例连接地址获取如下图。
图 使用内网通过同一个VPC访问Kafka实例的连接地址(实例已开启SASL)
- 如果是公网访问,实例端口为9095,实例连接地址获取如下图。
图 公网访问Kafka实例的连接地址(实例已开启SASL)
3.已获取开启的SASL认证机制。
在Kafka实例详情页的“连接信息”区域,查看“开启的SASL认证机制”。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。如果页面未显示“开启的SASL认证机制”,默认使用PLAIN机制。
图 开启的SASL认证机制
4.如果Kafka实例未开启自动创建Topic功能,获取Topic名称。
在实例的Topic管理页签中获取(可选)步骤三:创建Topic中创建的Topic名称。
图 查看Topic名称
5.已购买ECS,并完成JDK安装、环境变量配置以及Kafka开源客户端下载,具体操作请参考准备环境。
配置生产消费配置文件
步骤 1 登录Linux系统的ECS。
步骤 2 在ECS的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。
其中,IP地址必须为实例连接地址(从前提条件获取的连接地址),host为每个实例主机的名称(您可以自定义主机的名称,但不能重复)。
例如:
10.154.48.120 server01
10.154.48.121 server02
10.154.48.122 server03
步骤 3 下载client.jks证书。在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。
解压压缩包,获取压缩包中的客户端证书文件:client.jks。
步骤 4 根据已获取的SASL认证机制,修改Kafka命令行工具配置文件。
- PLAIN机制: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="**********" \
password="**********";
sasl.mechanism=PLAIN
说明username和password为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码,或者创建SASL_SSL用户时设置的用户名和密码。
- SCRAM-SHA-512机制: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="**********" \
password="**********";
sasl.mechanism=SCRAM-SHA-512
说明username和password为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码,或者创建SASL_SSL用户时设置的用户名和密码。
步骤5 根据安全协议,修改Kafka命令行工具配置文件。
1.SASL_SSL: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
security.protocol=SASL_SSL
ssl.truststore.location={ssl_truststore_path}
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm=
参数说明:
- ssl.truststore.location配置为client.truststore.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
- ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka 。
- ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要 保持关闭状态,必须设置为空 。
- SASL_PLAINTEXT: 在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
security.protocol=SASL_PLAINTEXT
生产消息
进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息。
./kafka-console-producer.sh --broker-list {连接地址} --topic 连接地址−−topic{Topic名称} --producer.config ../config/producer.properties
参数说明如下:
- 连接地址:从前提条件获取的连接地址。
- Topic名称:Kafka实例下创建的Topic名称。
示例如下,“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”为Kafka实例连接地址。
执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095 --topic topic-demo --producer.config ../config/producer.propertiesHello
DMS
Kafka!
^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
消费消息
执行如下命令消费消息。
./kafka-console-consumer.sh --bootstrap-server {连接地址} --topic 连接地址−−topic{Topic名称} --group ${消费组名称} --from-beginning --consumer.config ../config/consumer.properties
参数说明如下:
- 连接地址:从前提条件获取的连接地址。
- Topic名称:Kafka实例下创建的Topic名称。
- 消费组名称:根据您的业务需求,设定消费组名称。 如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败 。 消费组名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.xxx.xxx.202:9095,10.xxx.xxx.197:9095,10.xxx.xxx.68:9095 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
Hello
Kafka!
DMS
^CProcessed a total of 3 messages
[root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。
后续步骤
您可以通过设置监控指标的告警规格,当实例、节点、队列等有异常时,可以及时接收异常信息。