一、环境准备
1.1 环境说明
本文搭建MongoDB,基于WMware虚拟机,操作系统CentOS 8,且已经基于Kubeadm搭好了k8s集群,k8s节点信息如下:
服务器 | IP地址 |
master | 192.168.31.80 |
node1 | 192.168.31.8 |
node2 | 192.168.31.9 |
如需知道k8s集群搭建,可跳转我的文章《kubeadm部署k8s集群》查看。
1.2 安装说明
本文演示如何在 K8s下部署 Kafka 集群,并且搭建后除了可以从 K8s 内部访问 Kafka 服务,也支持从 K8s 集群外部访问 Kafka 服务。本次我们使用 StatefulSet 方式搭ZooKeeper 集群,使用 Service&Deployment 搭建 Kafka 集群。
二、创建NFS存储
NFS 存储主要是为了给 Kafka、ZooKeeper 提供稳定的后端存储,当 KafkaZooKeeper 的 Pod 发生故障重启或迁移后,依然能获得原先的数据。
2.1 安装NFS
我选择在 master 节点创建 NFS 存储,首先执行如下命令安装 NFS:
yum -y install nfs-utils rpcbind
2.2 创建NFS共享文件夹
mkdir -p /var/nfs/kafka/pv{1..3}
mkdir -p /var/nfs/zookeeper/pv{1..3}
2.3编辑配置文件
vim /etc/exports
/var/nfs/kafka/pv1 *(rw,sync,no_root_squash)
/var/nfs/kafka/pv2 *(rw,sync,no_root_squash)
/var/nfs/kafka/pv3 *(rw,sync,no_root_squash)
/var/nfs/zookeeper/pv1 *(rw,sync,no_root_squash)
/var/nfs/zookeeper/pv2 *(rw,sync,no_root_squash)
/var/nfs/zookeeper/pv3 *(rw,sync,no_root_squash)
2.4 配置生效
exportfs -r
2.5 查看所有共享目录
exportfs -v
2.6 重启NFS服务
systemctl start nfs-server
systemctl enabled nfs-server
systemctl start rpcbind
systemctl enabled rpcbind
2.7 其他节点安装nfs-utils
yum -y install nfs-utils
2.8 查看master节点的nfs共享目录信息
showmount -e 192.168.31.80
三、创建Zookeeper集群
3.1 创建namespace空间
kubectl create ns kafka-cluster
3.2 创建Zookeeper PV卷
cat > zookeeper-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: k8s-pv-zk01
namespace: kafka-cluster
labels:
app: zk
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
nfs:
server: 192.168.31.80 # 需要改成你的nfs服务地址
path: "/var/nfs/zookeeper/pv1" # 需要改成你的pv卷目录地址
persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: k8s-pv-zk02
namespace: kafka-cluster
labels:
app: zk
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
nfs:
server: 192.168.31.80
path: "/var/nfs/zookeeper/pv2"
persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: k8s-pv-zk03
namespace: kafka-cluster
labels:
app: zk
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
nfs:
server: 192.168.31.80
path: "/var/nfs/zookeeper/pv3"
persistentVolumeReclaimPolicy: Recycle
3.3 执行命令创建PV
kubectl apply -f zookeeper-pv.yaml
3.4 执行命令查看PV是否创建成功
kubectl get pv
3.5 创建Zookeeper Service
cat > zookeeper-service.yaml
apiVersion: v1
kind: Service
metadata:
name: zk-hs
namespace: kafka-cluster
labels:
app: zk
spec:
selector:
app: zk
clusterIP: None
ports:
- name: server
port: 2888
- name: leader-election
port: 3888
---
apiVersion: v1
kind: Service
metadata:
name: zk-cs
namespace: kafka-cluster
labels:
app: zk
spec:
selector:
app: zk
type: NodePort
ports:
- name: client
port: 2181
nodePort: 31811
3.6 执行命令创建Service
kubectl apply -f zookeeper-service.yaml
3.7 创建Zookeeper StatefulSet
cat > zookeeper-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zk
namespace: kafka-cluster
spec:
serviceName: "zk-hs"
replicas: 3 # by default is 1
selector:
matchLabels:
app: zk # has to match .spec.template.metadata.labels
updateStrategy:
type: RollingUpdate
podManagementPolicy: Parallel
template:
metadata:
labels:
app: zk # has to match .spec.selector.matchLabels
spec:
containers:
- name: zk
imagePullPolicy: Always
image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=4G \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
volumeClaimTemplates:
- metadata:
name: datadir
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
3.8 执行命令生成StatefulSet
kubectl apply -f zookeeper-statefulset.yaml
3.9 查看有没有新建成功
kubectl get pod -n kafka-cluster -o wide
kubectl get service -n kafka-cluster -o wide
kubectl get StatefulSet -n kafka-cluster -o wide
四、创建kafka集群
4.1 创建kafka service
cat > kafka-service.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-service-1
namespace: kafka-cluster
labels:
app: kafka-service-1
spec:
type: NodePort
ports:
- port: 9092
name: kafka-service-1
targetPort: 9092
nodePort: 30901
protocol: TCP
selector:
app: kafka-1
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-2
namespace: kafka-cluster
labels:
app: kafka-service-2
spec:
type: NodePort
ports:
- port: 9092
name: kafka-service-2
targetPort: 9092
nodePort: 30902
protocol: TCP
selector:
app: kafka-2
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-3
namespace: kafka-cluster
labels:
app: kafka-service-3
spec:
type: NodePort
ports:
- port: 9092
name: kafka-service-3
targetPort: 9092
nodePort: 30903
protocol: TCP
selector:
app: kafka-3
4.2 执行命令生成service
kubectl apply -f kafka-service.yaml
4.3 检查service 是否成功
kubectl get service -n kafka-cluster -o wide
4.4 创建kafka Deployment
cat > kafka-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-1
namespace: kafka-cluster
spec:
replicas: 1
selector:
matchLabels:
app: kafka-1
template:
metadata:
labels:
app: kafka-1
spec:
containers:
- name: kafka-1
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-0.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-1.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-2.zk-hs.kafka-cluster.svc.cluster.local:2181
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_CREATE_TOPICS
value: mytopic:2:1
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_PORT
value: "30901"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.hostIP
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
volumes:
- name: datadir
nfs:
server: 192.168.31.80
path: "/var/nfs/kafka/pv1"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-2
namespace: kafka-cluster
spec:
replicas: 1
selector:
matchLabels:
app: kafka-2
template:
metadata:
labels:
app: kafka-2
spec:
containers:
- name: kafka-2
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-0.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-1.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-2.zk-hs.kafka-cluster.svc.cluster.local:2181
- name: KAFKA_BROKER_ID
value: "2"
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_PORT
value: "30902"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.hostIP
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
volumes:
- name: datadir
nfs:
server: 192.168.31.80
path: "/var/nfs/kafka/pv2"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-3
namespace: kafka-cluster
spec:
replicas: 1
selector:
matchLabels:
app: kafka-3
template:
metadata:
labels:
app: kafka-3
spec:
containers:
- name: kafka-3
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-0.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-1.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-2.zk-hs.kafka-cluster.svc.cluster.local:2181
- name: KAFKA_BROKER_ID
value: "3"
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_PORT
value: "30903"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.hostIP
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
volumes:
- name: datadir
nfs:
server: 192.168.31.80
path: "/var/nfs/kafka/pv3"
4.5 执行命令生成deployment
kubectl apply -f kafka-deployment.yaml
4.6 检查是否生成成功
kubectl get pod -n kafka-cluster -o wide
五、测试
5.1 在k8s页面上进入容器
5.2 新建top
kafka-topics.sh --create --topic test_topic --zookeeper zk-0.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-1.zk-hs.kafka-cluster.svc.cluster.local:2181,zk-2.zk-hs.kafka-cluster.svc.cluster.local:2181 --partitions 1 --replication-factor 1
5.3 在k8s打开新页面,进入容器中,打开生产者
kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic
5.4 在k8s打开新页面,进入容器中,打开消费者
kafka-console-consumer.sh --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic
在生产者中编辑发送"kafka",在消费者中消费到了"kafka",整明我们的zookeeper集群和kafka集群是ok的,到此, 本教程结束!