1. 环境
- 操作系统 Linux Centos
- zookeeper版本:6.3
- Kafka版本: 2.11
- kafka建议奇数个服务,本文安装3台服务
- Kakfa开启SASL认证
2. 安装步骤
1. 安装zookeeper
- zookeeper安装包:apache-zookeeper-3.6.3-bin.tar.gz
- 安装包上传
- 解压:tar zxvf apache-zookeeper-3.6.3-bin.tar.gz
- 移动:mv apache-zookeeper-3.6.3-bin /usr/local/zookeeper
- 创建data数据目录:cd /usr/local/zookeeper -> mkdir data
- 配置cfg:cd conf -> cp zoo_sample.cfg zoo.cfg -> vim zoo.cfg
#先把dataDir=/tmp/zookeeper注释,然后将下面四行代码添加到文件末尾
dataDir=/usr/local/zookeeper/data
server.1=111.111.222.185:2888:3888
server.2=111.111.222.186:2888:3888
server.3=111.111.222.187:2888:3888
- 创建myid文件:cd ../data -> touch myid -> echo "1">>myid
- 拷贝:scp -r /usr/local/zookeeper root@111.111.222.186:/usr/local/ -> scp -r /usr/local/zookeeper root@111.111.222.187:/usr/local/
- 分别修改myid文件:cd /usr/local/zookeeper/data -> echo "2">myid
- 分别开启端口:firewall-cmd --zone=public --add-port=2888/tcp --permanent -> firewall-cmd --zone=public --add-port=3888/tcp --permanent -> firewall-cmd --zone=public --add-port=12181/tcp --permanent
- 分别重启防火墙:firewall-cmd --reload
- 分别启动:/usr/local/zookeeper/bin/zkServer.sh start
- 分别查看状态:/usr/local/zookeeper/bin/zkServer.sh status
- 客户端连接:/usr/local/zookeeper/bin/zkCli.sh -server 111.111.222.185:12181
- 注:端口2181改为12181,不使用默认端口。
2. 安装kafka
- kafka安装包:11-2.0.0.tgz
- 安装包上传
- 解压:tar -zxvf kafka_2.11-2.0.0.tgz
- 移动:mv kafka_2.11-2.0.0 /usr/local/kafka
- 配置properties:cd /usr/local/kafka/config/ -> vim server.properties
#Kafka的配置信息就是在server.properties里面配置的
#找到下面两行代码并分别注释
#broker.id=0
#zookeeper.connect=localhost:2181
在文件底部添加如下三个配置:
broker.id=1
zookeeper.connect=111.111.222.185:12181,111.111.222.186:12181,111.111.222.187:12181
listeners = PLAINTEXT://111.111.222.185:19092
注:broker.id:每台机器不能一样。zookeeper.connect:有几台zookeeper服务器,zookeeper.connect就设置为几台,必须全部加进去。listeners:在配置集群时必须设置,不然以后操作会报找不到leader的错误。集群的另外几台服务器,zookeeper.connect的配置跟这里一样,但是broker.id和listeners不能一样。
- 配置properties:vim producer.properties
bootstrap.servers=111.111.222.185:19092,111.111.222.186:19092,111.111.222.187:19092
- 配置properties:vim consumer.properties
bootstrap.servers=111.111.222.185:19092,111.111.222.186:19092,111.111.222.187:19092
group.id=logReportConsumerGroup
- 拷贝:scp -r /usr/local/kafka root@111.111.222.186:/usr/local/ -> scp -r /usr/local/kafka root@111.111.222.187:/usr/local/
- 分别修改id和listeners:cd /usr/local/kafka/config/ -> vim server.properties
#Ctrl+D一直定位到文件最底部,修改broker.id和listeners:
broker.id=2
zookeeper.connect=111.111.222.185:12181,111.111.222.186:12181,111.111.222.187:12181
listeners = PLAINTEXT://111.111.222.186:19092
- 分别开启端口:firewall-cmd --zone=public --add-port=19092/tcp --permanent
- 分别重启防火墙:firewall-cmd --reload
- 分别启动zookeeper:/usr/local/zookeeper/bin/zkServer.sh start
- 分别启动kafka:/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 检查是否启动成功:jps
- 创建topic:cd /usr/local/kafka/ -> bin/kafka-topics.sh --create --zookeeper 111.111.222.185:12181 --replication-factor 1 --partitions 1 --topic test
- 查看topic:cd /usr/local/kafka/ -> bin/kafka-topics.sh --list --zookeeper 111.111.222.187:12181 注:此IP可以是集群中的任何一个,集群中的任何一台服务器都可以看到。
- 创建发布:/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 111.111.222.185:19092 --topic test
- 创建消费:/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 111.111.222.186:19092 --topic test --from-beginning
- 注:端口9092改为19092。
3. Kafka的权限认证(配置SASL\PLAIN)
3. 内部客户端工具
- 针对kafka-console-consumer.sh,在/usr/local/kafka/config/consumer.properties中加入如下内容:注意jaas.config参数值末尾要加分号!
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="ismAdmin" password="**********";
- 针对kafka-console-producer.sh,在/usr/local/kafka/config/producer.properties中加入如下内容:注意jaas.config参数值末尾要加分号!
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="ismAdmin" password="***************";
- bin目录下输入命令,启动生产者:加上../config/producer.properties这个参数的目的是设置认证协议,此配置要和Broker端设置的一样。
./kafka-console-producer.sh --broker-list 111.111.222.185:19092 --topic test --producer.config ../config/producer.properties
- bin目录下输入命令,启动消费者:加上../config/consumer.properties这个参数的目的是设置认证协议,此配置要和Broker端设置的一样。
./kafka-console-consumer.sh --bootstrap-server 111.111.222.186:19092 --topic test --from-beginning --consumer.config ../config/consumer.properties
- 针对kafka-consumer-groups.sh,可以单独touch一个配置文件/usr/local/kafka/config/properties,在启动程序时加上一个参数引入此文件即可。注意sasl.jaas.config参数值末尾要加分号!
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="ismAdmin" password="***************";
- bin目录下,启动kafka-consumer-groups.sh:
./kafka-consumer-groups.sh --bootstrap-server 111.111.222.186:19092 --list --command-config ../config/sasl.properties
4. 外部客户端工具
- 创建配置文件conf:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="ismAdmin" \
password="***************";
};
- Java程序中引入:
System.setProperty("java.security.auth.login.config","/config/kafka_client_jaas.conf"); //配置文件路径
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
5. 服务端配置
- 在kafka/config/properties增加如下内容:
# 配置listener
listeners=SASL_PLAINTEXT://111.111.222.185:19092 advertised.listeners=SASL_PLAINTEXT://111.111.222.185:19092
# 认证方面
# 表示Broker间通信使用SASL
security.inter.broker.protocol=SASL_PLAINTEXT
# 表示开启PLAIN认证机制
sasl.enabled.mechanisms=PLAIN
# 表示Broker间通信也启用PLAIN机制
sasl.mechanism.inter.broker.protocol=PLAIN
# 授权方面
# 设置身份验证使用的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置超级账号,如果多个用分号分割
super.users=User:ismAdmin
# 对所有用户topic可见要禁用
allow.everyone.if.no.acl.found=false
auto.create.topics.enable=false
delete.topic.enable=true
# 配置JAAS
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="ismAdmin" \
password="***************" \
user_ismAdmin="***************";
注:
1.配置JAAS的字段是:
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config({listenerName}:就是上面的sasl_plaintext,此和linsteners中配置的协议要一致。{saslMechanism}:就是上面的plain)。
2.此字段值是用来配置broker间通信使用的用户名和密码以及客户端连接时需要的用户名和密码,其中username和password是broker用于初始化连接到其他的broker,kafka用户为broker间的通讯。在一个Kafka集群中,这两个内容要一样,集群中每个Broker去连接其他Broker的时候都使用此username和password来让对方进行认证。而user_NAME语句,是用来配置客户端连接Broker时使用的用户名和密码,即user_后的内容是新建用户的用户名,等号后面是密码,完整语句是user_USERNAME="PASSWORD"。
3.此字段值的末尾必须加分号!
6. 分发配置文件
- 拷贝:scp -r /usr/local/kafka root@111.111.222.186:/usr/local/ -> scp -r /usr/local/kafka root@111.111.222.187:/usr/local/
- 分别修改id和listeners:cd /usr/local/kafka/config/ -> vim server.properties
- 分别启动zookeeper:/usr/local/zookeeper/bin/zkServer.sh start
- 分别启动kafka:/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 检查是否启动成功:jps
4. kafka常用命令
- 消费ismsLogQuery主题的信息(带权限认证):/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 111.222.186:19092 --topic ismsLogQuery --from-beginning --consumer.config /usr/local/kafka/config/consumer.properties
- 生产ismsLogQuery主题的信息(带权限认证):/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 111.222.185:19092 --topic ismsLogQuery --producer.config /usr/local/kafka/config/producer.properties
- topic列表查询:/usr/local/kafka/bin/kafka-topics.sh --zookeeper 111.222.185:12181 --list
- 查看所有topic的详细信息:/usr/local/kafka/bin/kafka-topics.sh --zookeeper 111.111.222.185:12181 --describe
- 查询某个topic的详情:/usr/local/kafka/bin/kafka-topics.sh --zookeeper 111.111.222.185:12181 --describe --topic test
- 创建topic:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 111.111.222.185:12181 --replication-factor 3 --partitions 3 --topic test
- 删除topic:/usr/local/kafka/bin/kafka-topics.sh --zookeeper 111.222.185:12181 --delete --topic test
- 消费者列表查询(带权限认证):/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 111.222.186:19092 --list--command-config /usr/local/kafka/config/sasl.properties
- 显示某个消费组的消费详情(带权限认证):/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 111.222.186:19092 --describe --group my-group--command-config /usr/local/kafka/config/sasl.properties
- 删除消费者组(带权限认证):/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 111.111.222.186:19092 --delete --group my-group--command-config /usr/local/kafka/config/sasl.properties
- 修改指定topic的数据存储时间(两天):/usr/local/kafka/bin/kafka-configs.sh --zookeeper 111.111.222.186:12181 --alter --entity-name ismsLogQuery --entity-type topics --add-config retention.ms=172800000
5. kafka的启动和关闭
- 启动:/usr/local/zookeeper/bin/zkServer.sh start -> /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 关闭:/usr/local/kafka/bin/kafka-server-stop.sh -> /usr/local/zookeeper/bin/zkServer.sh stop