1 kafka简介
Kafka 是一个分布式的、可扩展的、高吞吐量的消息流平台。它的工作原理可以简单地描述为以下几个关键组件和概念:
(1)主题(Topic):主题是 Kafka 中消息的逻辑分类,它是消息发布和订阅的基本单位。生产者将消息发送到一个或多个主题,而消费者从主题订阅消息。
(2)分区(Partition):每个主题可以被分成多个分区,每个分区是一个独立的、有序的消息日志。分区允许 Kafka 对数据进行水平扩展,每个分区可以在不同的服务器上进行复制和处理。
(3)生产者(Producer):生产者是负责向 Kafka 主题发送消息的应用程序或组件。生产者可以选择将消息发送到特定分区,或者让 Kafka 根据一定的策略自动分配分区。
(4)消费者(Consumer):消费者是从 Kafka 主题接收和处理消息的应用程序或组件。每个消费者都属于一个消费者组(Consumer Group),每个分区只能由消费者组中的一个消费者进行消费。消费者可以从分区的特定偏移量(Offset)处读取消息,并可以控制自己的消费速度。
(5)代理(Broker):代理是 Kafka 集群中的一个节点,负责存储和处理消息。每个分区在多个代理上进行复制以实现容错性和可用性。代理是分布式系统的核心组件,负责处理消息的存储、复制和转发。
(6)ZooKeeper:ZooKeeper 是 Kafka 使用的协调服务,在 Kafka 中起到重要的角色。它用于协调代理的选举、分区分配、消费者组的协调和偏移量的管理等。
Kafka 的工作流程如下:
step1:生产者将消息发送到 Kafka 的主题。
step2:将消息按照分区规则存储在各个代理上的分区中。
Step3: 消费者加入消费者组,并从分区中获取消息进行消费。
Step4:消费者可以控制自己的消费速度和从特定偏移量开始消费。
Step5:Kafka 通过分区的复制和容错机制确保高可用性和数据冗余。
Step6:Kafka 还提供了一些高级特性,如流处理、Exactly-Once 语义、消息回溯等。
这只是对 Kafka 工作原理的简要概述,Kafka 还有很多功能和细节可以进一步探索和了解。如果需要更深入的了解,建议参考 Kafka 的官方文档(https://kafka.apache.org/documentation/#introduction)和相关资料。
Kafka的应用场景包括但不限于:
日志收集与聚合:Kafka适用于大规模的日志数据流的实时收集、处理和存储。
实时流处理:Kafka提供了流式数据处理的基础设施,可以进行流计算、ETL(Extract, Transform, Load)等任务。
消息队列:Kafka可用作可靠的消息队列,用于支持异步通信和解耦系统组件之间的消息传递。
事件驱动架构:Kafka可以作为事件驱动架构的核心组件,支持事件驱动的微服务架构或流媒体处理。
总之,Kafka是一个高性能、可靠、分布式的流处理平台和消息队列,用于处理大规模的实时数据流和构建实时数据流应用程序。
2 测试
2.1 单机功能测试(主题,分区都在同一个broker,一个生产者,一个消费者)
2.1.1部署方案
Kafka和zookeeper(kafka自带)部署在host 235上,一个kafka实例地址配置网卡ip:默认端口,一个zookeeper实例配置网卡ip:默认端口。
2.1.2 配置方法:
Step1:解压kafka安装包
在linux指定目录下解压kafka安装包
Step2:安装java-jdk
确认测试机器是否安装java jdk,可通过java -version 查看,如未安装,需要安装java jdk,yum -y list *java*可看一下当前yum源有没有可以下载的java-jdk包,有的话可直接安装,没有的话需要下载安装包安装。
Step3:解压kafka压缩包,并将安装位置写入环境变量。
举例如下:
Vi /etc/profile
export KAFKA_HOME=/root/kafka/kafka_2.13-3.6.0
export PATH=$PATH:$KAFKA_HOME/bin
Source /etc/profile
Step4: 创建log路径
创建一个log路径,用于存储kafka运行过程中的日志文件(后面做压力测试占用的日志空间比较大,日志空间如果不足,会导致kafka程序运行异常)
Step5:编辑kafka_2.13-3.6.0/config/server.properties
修改以下字段:
broker.id=0 #每个broker实例唯一的标识id,不能重复
listeners=PLAINTEXT://20.11.3.3:9092 #监听地址,选择了一张弹性网卡来通信
log.dirs=/mnt/kafka #自定义的log路径
zookeeper.connect=20.11.2.13:2181 #连接的zookeeper服务器,选择了服务器上另一个弹性网卡
Step6:配置zookeeper服务器实例,在这次测试中,用的是kafka自带的zookeeper服务,也可以下载一个zookeeper安装包,单独安装。
用kafka压缩包里自带的zookeeper
首先创建两个zookeeper数据路径
mkdir xxxxxx/zookeeper/data -p
mkdir xxxxxx/zookeeper/datalog -p
Step7: 编辑kafka_2.13-3.6.0/config/zookeeper.properties
修改以下内容:
dataDir=/root/kafka/zookeeper/data #自定义的data路径
dataLogDir=/root/kafka/zookeeper/datalog #自定义的datalog路径,事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
server.1=20.11.2.13:2888:3888 ##集群服务器配置,数字1/2/3需要与myid文件一致。右边两个端口,2888表示数据同步和通信端口;
3888表示选举端口
Step8:写myid文件
echo "1" > xxxxxx/zookeeper/data/myid
2.1.3 功能验证
测试过程:
Step1:开启kafka和zookeeper实例
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
Step2:检查zookeeper实例是否正常运行
zookeeper-shell.sh 20.11.2.13:2181
Step3:创建主题
kafka-topics.sh --create --bootstrap-server 20.11.1.39:9092 --replication-factor 1 --partitions 3 --topic TestTopicOne
Step4:查看主题是否设置成功
kafka-topics.sh --bootstrap-server 20.11.1.39:9092 --list
Step5:查看主题描述是否和预期一致
kafka-topics.sh --bootstrap-server 20.11.1.39:9092 --describe --topic TestTopicOne
Step6: 给创建的主题的三个分区的其中一个分区,创建flower(创建3个副本)
Step7:生产者生产消息
bin/kafka-console-producer.sh --broker-list 20.11.1.39:9092 --topic TestTopicOne
简单发布订阅模式下:生产者生产消息之后,可以看到对应主题的各个分区有对应的消息量偏移:
Step8:消费者消费消息
bin/kafka-console-consumer.sh --bootstrap-server 20.11.1.39:9092 --topic test --from-beginning(从最开始的偏移量开始消费)
kafka-console-consumer.sh --bootstrap-server 20.11.1.39:9092 --topic TestTopicOne --offset latest --partition 0(指定主题分区,从最新的偏移量开始消费)
简单发布订阅模式下:生产者向指定主题生产消息后,消费者从指定分区最新偏移消费消息。
Step9:关闭kafka和zookeeper实例
zookeeper-server-stop.sh
kafka-server-stop.sh
命令:
查看一个主题的各个分区的消息偏移量:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server 20.11.1.39:9092 --topic TestTopicOne --time -1
查看一个分区文件的消息内容:
kafka-dump-log.sh --files 00000000000000000000.log --print-data-log
增加分区到4个:
kafka-topics.sh --bootstrap-server 20.11.1.39:9092 --alter --topic TestTopicOne --partitions 4