searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kafka单机测试入门介绍

2024-06-07 03:15:10
7
0

1 kafka简介

Kafka 是一个分布式的、可扩展的、高吞吐量的消息流平台。它的工作原理可以简单地描述为以下几个关键组件和概念:

(1)主题(Topic):主题是 Kafka 中消息的逻辑分类,它是消息发布和订阅的基本单位。生产者将消息发送到一个或多个主题,而消费者从主题订阅消息。

(2)分区(Partition):每个主题可以被分成多个分区,每个分区是一个独立的、有序的消息日志。分区允许 Kafka 对数据进行水平扩展,每个分区可以在不同的服务器上进行复制和处理。

(3)生产者(Producer):生产者是负责向 Kafka 主题发送消息的应用程序或组件。生产者可以选择将消息发送到特定分区,或者让 Kafka 根据一定的策略自动分配分区。

(4)消费者(Consumer):消费者是从 Kafka 主题接收和处理消息的应用程序或组件。每个消费者都属于一个消费者组(Consumer Group),每个分区只能由消费者组中的一个消费者进行消费。消费者可以从分区的特定偏移量(Offset)处读取消息,并可以控制自己的消费速度。

(5)代理(Broker):代理是 Kafka 集群中的一个节点,负责存储和处理消息。每个分区在多个代理上进行复制以实现容错性和可用性。代理是分布式系统的核心组件,负责处理消息的存储、复制和转发。

(6)ZooKeeper:ZooKeeper 是 Kafka 使用的协调服务,在 Kafka 中起到重要的角色。它用于协调代理的选举、分区分配、消费者组的协调和偏移量的管理等。

 

Kafka 的工作流程如下:

step1:生产者将消息发送到 Kafka 的主题。

step2:将消息按照分区规则存储在各个代理上的分区中。

Step3: 消费者加入消费者组,并从分区中获取消息进行消费。

Step4:消费者可以控制自己的消费速度和从特定偏移量开始消费。

Step5:Kafka 通过分区的复制和容错机制确保高可用性和数据冗余。

Step6:Kafka 还提供了一些高级特性,如流处理、Exactly-Once 语义、消息回溯等。

这只是对 Kafka 工作原理的简要概述,Kafka 还有很多功能和细节可以进一步探索和了解。如果需要更深入的了解,建议参考 Kafka 的官方文档(https://kafka.apache.org/documentation/#introduction)和相关资料。

Kafka的应用场景包括但不限于:

日志收集与聚合:Kafka适用于大规模的日志数据流的实时收集、处理和存储。

实时流处理:Kafka提供了流式数据处理的基础设施,可以进行流计算、ETL(Extract, Transform, Load)等任务。

消息队列:Kafka可用作可靠的消息队列,用于支持异步通信和解耦系统组件之间的消息传递。

事件驱动架构:Kafka可以作为事件驱动架构的核心组件,支持事件驱动的微服务架构或流媒体处理。

总之,Kafka是一个高性能、可靠、分布式的流处理平台和消息队列,用于处理大规模的实时数据流和构建实时数据流应用程序。

2 测试

2.1 单机功能测试(主题,分区都在同一个broker,一个生产者,一个消费者)

2.1.1部署方案

Kafka和zookeeper(kafka自带)部署在host 235上,一个kafka实例地址配置网卡ip:默认端口,一个zookeeper实例配置网卡ip:默认端口。

2.1.2 配置方法:

Step1:解压kafka安装包

在linux指定目录下解压kafka安装包

Step2:安装java-jdk

确认测试机器是否安装java jdk,可通过java -version 查看,如未安装,需要安装java jdk,yum -y list *java*可看一下当前yum源有没有可以下载的java-jdk包,有的话可直接安装,没有的话需要下载安装包安装。

Step3:解压kafka压缩包,并将安装位置写入环境变量。

举例如下:

Vi /etc/profile

export KAFKA_HOME=/root/kafka/kafka_2.13-3.6.0

export PATH=$PATH:$KAFKA_HOME/bin

Source /etc/profile

Step4: 创建log路径

创建一个log路径,用于存储kafka运行过程中的日志文件(后面做压力测试占用的日志空间比较大,日志空间如果不足,会导致kafka程序运行异常)

Step5:编辑kafka_2.13-3.6.0/config/server.properties

修改以下字段:

broker.id=0   #每个broker实例唯一的标识id,不能重复

listeners=PLAINTEXT://20.11.3.3:9092  #监听地址,选择了一张弹性网卡来通信

log.dirs=/mnt/kafka  #自定义的log路径

zookeeper.connect=20.11.2.13:2181  #连接的zookeeper服务器,选择了服务器上另一个弹性网卡

Step6:配置zookeeper服务器实例,在这次测试中,用的是kafka自带的zookeeper服务,也可以下载一个zookeeper安装包,单独安装。

用kafka压缩包里自带的zookeeper

首先创建两个zookeeper数据路径

mkdir xxxxxx/zookeeper/data -p

mkdir xxxxxx/zookeeper/datalog -p

Step7: 编辑kafka_2.13-3.6.0/config/zookeeper.properties

修改以下内容:

dataDir=/root/kafka/zookeeper/data  #自定义的data路径

dataLogDir=/root/kafka/zookeeper/datalog   #自定义的datalog路径,事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多

server.1=20.11.2.13:2888:3888  ##集群服务器配置,数字1/2/3需要与myid文件一致。右边两个端口,2888表示数据同步和通信端口;

3888表示选举端口

Step8:写myid文件

echo "1" > xxxxxx/zookeeper/data/myid

2.1.3 功能验证

测试过程:

Step1:开启kafka和zookeeper实例

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

bin/kafka-server-start.sh -daemon config/server.properties

Step2:检查zookeeper实例是否正常运行

zookeeper-shell.sh 20.11.2.13:2181

Step3:创建主题

kafka-topics.sh --create --bootstrap-server 20.11.1.39:9092 --replication-factor 1 --partitions 3 --topic TestTopicOne

Step4:查看主题是否设置成功

kafka-topics.sh  --bootstrap-server 20.11.1.39:9092 --list

Step5:查看主题描述是否和预期一致

kafka-topics.sh  --bootstrap-server 20.11.1.39:9092 --describe --topic TestTopicOne

Step6: 给创建的主题的三个分区的其中一个分区,创建flower(创建3个副本)

Step7:生产者生产消息

bin/kafka-console-producer.sh --broker-list 20.11.1.39:9092 --topic TestTopicOne

简单发布订阅模式下:生产者生产消息之后,可以看到对应主题的各个分区有对应的消息量偏移:

Step8:消费者消费消息

bin/kafka-console-consumer.sh --bootstrap-server 20.11.1.39:9092 --topic test --from-beginning(从最开始的偏移量开始消费)

kafka-console-consumer.sh --bootstrap-server 20.11.1.39:9092 --topic TestTopicOne --offset latest --partition 0(指定主题分区,从最新的偏移量开始消费)

简单发布订阅模式下:生产者向指定主题生产消息后,消费者从指定分区最新偏移消费消息。

Step9:关闭kafka和zookeeper实例

zookeeper-server-stop.sh

kafka-server-stop.sh

 

 

命令:

查看一个主题的各个分区的消息偏移量:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server 20.11.1.39:9092 --topic TestTopicOne --time -1

查看一个分区文件的消息内容:

kafka-dump-log.sh --files 00000000000000000000.log --print-data-log

增加分区到4个:

kafka-topics.sh --bootstrap-server 20.11.1.39:9092 --alter --topic TestTopicOne --partitions 4

 

0条评论
0 / 1000
l****n
3文章数
0粉丝数
l****n
3 文章 | 0 粉丝
l****n
3文章数
0粉丝数
l****n
3 文章 | 0 粉丝
原创

Kafka单机测试入门介绍

2024-06-07 03:15:10
7
0

1 kafka简介

Kafka 是一个分布式的、可扩展的、高吞吐量的消息流平台。它的工作原理可以简单地描述为以下几个关键组件和概念:

(1)主题(Topic):主题是 Kafka 中消息的逻辑分类,它是消息发布和订阅的基本单位。生产者将消息发送到一个或多个主题,而消费者从主题订阅消息。

(2)分区(Partition):每个主题可以被分成多个分区,每个分区是一个独立的、有序的消息日志。分区允许 Kafka 对数据进行水平扩展,每个分区可以在不同的服务器上进行复制和处理。

(3)生产者(Producer):生产者是负责向 Kafka 主题发送消息的应用程序或组件。生产者可以选择将消息发送到特定分区,或者让 Kafka 根据一定的策略自动分配分区。

(4)消费者(Consumer):消费者是从 Kafka 主题接收和处理消息的应用程序或组件。每个消费者都属于一个消费者组(Consumer Group),每个分区只能由消费者组中的一个消费者进行消费。消费者可以从分区的特定偏移量(Offset)处读取消息,并可以控制自己的消费速度。

(5)代理(Broker):代理是 Kafka 集群中的一个节点,负责存储和处理消息。每个分区在多个代理上进行复制以实现容错性和可用性。代理是分布式系统的核心组件,负责处理消息的存储、复制和转发。

(6)ZooKeeper:ZooKeeper 是 Kafka 使用的协调服务,在 Kafka 中起到重要的角色。它用于协调代理的选举、分区分配、消费者组的协调和偏移量的管理等。

 

Kafka 的工作流程如下:

step1:生产者将消息发送到 Kafka 的主题。

step2:将消息按照分区规则存储在各个代理上的分区中。

Step3: 消费者加入消费者组,并从分区中获取消息进行消费。

Step4:消费者可以控制自己的消费速度和从特定偏移量开始消费。

Step5:Kafka 通过分区的复制和容错机制确保高可用性和数据冗余。

Step6:Kafka 还提供了一些高级特性,如流处理、Exactly-Once 语义、消息回溯等。

这只是对 Kafka 工作原理的简要概述,Kafka 还有很多功能和细节可以进一步探索和了解。如果需要更深入的了解,建议参考 Kafka 的官方文档(https://kafka.apache.org/documentation/#introduction)和相关资料。

Kafka的应用场景包括但不限于:

日志收集与聚合:Kafka适用于大规模的日志数据流的实时收集、处理和存储。

实时流处理:Kafka提供了流式数据处理的基础设施,可以进行流计算、ETL(Extract, Transform, Load)等任务。

消息队列:Kafka可用作可靠的消息队列,用于支持异步通信和解耦系统组件之间的消息传递。

事件驱动架构:Kafka可以作为事件驱动架构的核心组件,支持事件驱动的微服务架构或流媒体处理。

总之,Kafka是一个高性能、可靠、分布式的流处理平台和消息队列,用于处理大规模的实时数据流和构建实时数据流应用程序。

2 测试

2.1 单机功能测试(主题,分区都在同一个broker,一个生产者,一个消费者)

2.1.1部署方案

Kafka和zookeeper(kafka自带)部署在host 235上,一个kafka实例地址配置网卡ip:默认端口,一个zookeeper实例配置网卡ip:默认端口。

2.1.2 配置方法:

Step1:解压kafka安装包

在linux指定目录下解压kafka安装包

Step2:安装java-jdk

确认测试机器是否安装java jdk,可通过java -version 查看,如未安装,需要安装java jdk,yum -y list *java*可看一下当前yum源有没有可以下载的java-jdk包,有的话可直接安装,没有的话需要下载安装包安装。

Step3:解压kafka压缩包,并将安装位置写入环境变量。

举例如下:

Vi /etc/profile

export KAFKA_HOME=/root/kafka/kafka_2.13-3.6.0

export PATH=$PATH:$KAFKA_HOME/bin

Source /etc/profile

Step4: 创建log路径

创建一个log路径,用于存储kafka运行过程中的日志文件(后面做压力测试占用的日志空间比较大,日志空间如果不足,会导致kafka程序运行异常)

Step5:编辑kafka_2.13-3.6.0/config/server.properties

修改以下字段:

broker.id=0   #每个broker实例唯一的标识id,不能重复

listeners=PLAINTEXT://20.11.3.3:9092  #监听地址,选择了一张弹性网卡来通信

log.dirs=/mnt/kafka  #自定义的log路径

zookeeper.connect=20.11.2.13:2181  #连接的zookeeper服务器,选择了服务器上另一个弹性网卡

Step6:配置zookeeper服务器实例,在这次测试中,用的是kafka自带的zookeeper服务,也可以下载一个zookeeper安装包,单独安装。

用kafka压缩包里自带的zookeeper

首先创建两个zookeeper数据路径

mkdir xxxxxx/zookeeper/data -p

mkdir xxxxxx/zookeeper/datalog -p

Step7: 编辑kafka_2.13-3.6.0/config/zookeeper.properties

修改以下内容:

dataDir=/root/kafka/zookeeper/data  #自定义的data路径

dataLogDir=/root/kafka/zookeeper/datalog   #自定义的datalog路径,事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多

server.1=20.11.2.13:2888:3888  ##集群服务器配置,数字1/2/3需要与myid文件一致。右边两个端口,2888表示数据同步和通信端口;

3888表示选举端口

Step8:写myid文件

echo "1" > xxxxxx/zookeeper/data/myid

2.1.3 功能验证

测试过程:

Step1:开启kafka和zookeeper实例

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

bin/kafka-server-start.sh -daemon config/server.properties

Step2:检查zookeeper实例是否正常运行

zookeeper-shell.sh 20.11.2.13:2181

Step3:创建主题

kafka-topics.sh --create --bootstrap-server 20.11.1.39:9092 --replication-factor 1 --partitions 3 --topic TestTopicOne

Step4:查看主题是否设置成功

kafka-topics.sh  --bootstrap-server 20.11.1.39:9092 --list

Step5:查看主题描述是否和预期一致

kafka-topics.sh  --bootstrap-server 20.11.1.39:9092 --describe --topic TestTopicOne

Step6: 给创建的主题的三个分区的其中一个分区,创建flower(创建3个副本)

Step7:生产者生产消息

bin/kafka-console-producer.sh --broker-list 20.11.1.39:9092 --topic TestTopicOne

简单发布订阅模式下:生产者生产消息之后,可以看到对应主题的各个分区有对应的消息量偏移:

Step8:消费者消费消息

bin/kafka-console-consumer.sh --bootstrap-server 20.11.1.39:9092 --topic test --from-beginning(从最开始的偏移量开始消费)

kafka-console-consumer.sh --bootstrap-server 20.11.1.39:9092 --topic TestTopicOne --offset latest --partition 0(指定主题分区,从最新的偏移量开始消费)

简单发布订阅模式下:生产者向指定主题生产消息后,消费者从指定分区最新偏移消费消息。

Step9:关闭kafka和zookeeper实例

zookeeper-server-stop.sh

kafka-server-stop.sh

 

 

命令:

查看一个主题的各个分区的消息偏移量:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server 20.11.1.39:9092 --topic TestTopicOne --time -1

查看一个分区文件的消息内容:

kafka-dump-log.sh --files 00000000000000000000.log --print-data-log

增加分区到4个:

kafka-topics.sh --bootstrap-server 20.11.1.39:9092 --alter --topic TestTopicOne --partitions 4

 

文章来自个人专栏
网络学习
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0