创建实例时开启SASL_SSL访问,则数据加密传输,安全性更高。
由于安全问题,支持的加密套件为TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256和TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256。
本节介绍如何使用开源的Kafka客户端访问开启SASL的Kafka专享实例的方法。
说明:
使用SASL方式连接Kafka实例时,为了客户端能够快速解析实例的Broker,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系。
其中,IP地址必须为实例连接地址(Broker地址),host为每个实例主机的名称(您可以自定义主机的名称,但不能重复)。
例如:
10.154.48.120 server01
10.154.48.121 server02
10.154.48.122 server03
已配置正确的安全组。
访问开启SASL的Kafka专享实例时,支持VPC内访问。实例需要配置正确的安全组规则,具体安全组配置要求,请参考表3-2。
已获取连接Kafka专享版实例的地址。
使用VPC内访问,实例端口为9093,实例连接地址获取如下图。
获取VPC内访问Kafka专享实例的连接地址(实例已开启SASL)
Kafka专享实例已创建Topic,否则请提前创建Topic。
已下载client.truststore.jks证书。如果没有,在控制台单击Kafka实例名称,进入实例详情页面,在“基本信息 > 高级配置 > Kafka SASL_SSL”所在行,单击 。下载压缩包后解压,获取压缩包中的客户端证书文件:client.truststore.jks。
已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本,确保Kafka实例版本与命令行工具版本相同。
已在Kafka命令行工具的使用环境中安装Java Development Kit 1.8.111或以上版本,并完成环境变量配置。
以下操作命令以Linux系统为例进行说明。
解压Kafka命令行工具。
进入文件压缩包所在目录,然后执行以下命令解压文件。
tar -zxf [kafka_tar]
其中,[kafka_tar]表示命令行工具的压缩包名称。
例如:
tar -zxf kafka_2.11-1.1.0.tgz
修改Kafka命令行工具配置文件。
在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="**********" \
password="**********";
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
ssl.truststore.location=/opt/kafka_2.11-1.1.0/config/client.truststore.jks
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm=
参数说明:
username和password为创建Kafka专享实例过程中开启SASL_SSL时填入的用户名和密码。
ssl.trustore.location配置为client.truststore.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中拷贝路径时的“\”,否则客户端获取证书失败。
ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
进入Kafka命令行工具的“/bin”目录下。
注意,Windows系统下需要进入“/bin/windows”目录下。
执行如下命令进行生产消息。
./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties
参数说明如下:
连接地址:从前提条件获取的连接地址。
Topic名称:Kafka实例下创建的Topic名称。
如下示例,Kafka实例连接地址为“10.xxx.xxx.202:9093,10.xxx.xxx.197:9093,10.xxx.xxx.68:9093”。
执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 10.xxx.xxx.202:9093,10.xxx.xxx.197:9093,10.xxx.xxx.68:9093 --topic topic-demo --producer.config ../config/producer.properties
>hello
>DMS
>Kafka!
>^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
执行如下命令消费消息。
./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning --consumer.config ../config/consumer.properties
参数说明如下:
连接地址:从前提条件获取的连接地址。
Topic名称:Kafka实例下创建的Topic名称。
消费组名称:根据您的业务需求,设定消费组名称。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.xxx.xxx.202:9093,10.xxx.xxx.197:9093,10.xxx.xxx.68:9093 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
hello
Kafka!
DMS
hello
^CProcessed a total of 4 messages
[root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。