一、说在开始
对于Kafka,纯属小白一枚,本文以初学者的视角,记录学习过程。
二、缘起
2021年8月,接到一个项目,关于发送短信的,考虑到业务层与短信网关之间吞吐量的不匹配,大佬们决定引入Kafka,交给我的任务是:将业务层的短信内容推送Kafka。
那么,先抛开具体任务不看,我首先的疑问是:
- Kafka是什么?
- Kafka是如何工作的?
那么,先百度一下吧
三、Kafka是什么
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。
那么,根据这个定义,衍生出4个子问题:
- 什么是消息队列
- 什么是分布式消息队列
- 什么是基于发布/订阅模式的分布式消息队列
- 消息队列的应用场景
1、什么是消息队列
消息队列是一种进程间通信或同一进程的不同线程间的通信方式。它是一种异步通讯的中间件。可以将消息队列理解成邮局,发件人(消息的生产者)将信件(消息)送到邮局,然后由邮局负责把信件送给收件人(消息的消费者)。
2、什么是分布式消息队列
分布式消息队列,根据定义,可以理解为,不同应用之间的通信方式。常见的分布式消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等。
3、什么是基于发布/订阅模式的分布式消息队列
消息队列的两种模式:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)
(1)点对点:Queue(队列),不可重复消费
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再存储已经被消费的消息,所以即使queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费,不可能消费到已经被消费的消息。
(2)发布/订阅:Topic(话题),可以重复消费
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
发布/订阅模式下,当发布者消息量很大时,单个订阅者会出现处理能力不足的情况。因此,在实际场景中,将有多个订阅者节点组成一个订阅组,负载均衡的消费topic消息,即分组订阅。这样订阅者很容易实现消费能力的线性扩展。分组订阅可以看成是一个topic下有多个queue,每个queue是点对点的方式,queue之间是发布/订阅方式。
4、消息队列的应用场景
消息队列的应用场景主要有:缓冲/削峰,解耦和异步通信
(1)缓冲/削峰
在实际的应用系统中,如果数据生产端的数据产生速率与数据处理端的数据处理速率相当或小于,则系统可以正常运行,不会有很大的压力。反之,当系统上了一个促销活动,前端用户猛增,数据率随之增加数倍,甚至数十倍,使得服务端处理速率落后于数据产生的速率,久而久之造成数据积压,最终导致系统的崩溃。在数据生产端和处理端之间使用消息队列就可以解决这种问题。此时,消息队列就发挥了不同系统之间数据的缓冲和削峰的作用。数据生产端将数据发送到消息队列,然后随即返回响应。数据处理端则根据自己的处理速度从消息队列中拉取数据并处理。
(2)解耦
消息队列的两侧可以连接不同的系统,允许独立的拓展和修改各自的处理过程,只需确保使用相同的接口约束。
(3)异步通信
将处理的数据写入到消息队列中,并立即返回处理结果,队列中数据由另一个线程拉取出来做响应的处理,如:用户注册时获取验证码短信的场景。
接下来,继续来看,这个名叫Kafka的消息队列具体是怎么工作的呢?
四、Kafka是如何工作的
首先,简单介绍一下Kafka中的几个概念:
- producer:消息生产者,就是向broker发送消息的客户端
- consumer:消息消费者,就是从broker拉取数据的客户端
- consumer group:消费者组,由多个消费者consumer组成。消费者组内每个消费者负责消费不同的分区,一个分区只能由同一个消费者组内的一个消费者消费;消费者组之间相互独立,互不影响。所有的消费者都属于某个消费者组,即消费者组是一个逻辑上的订阅者。
- broker:一台服务器就是一个broker,一个集群由多个broker组成,一个broker可以有多个topic。
- topic:可以理解为一个队列,所有的生产者和消费者都是面向topic的。
- partition:分区,一个topic可以分为多个partition,每个partition都是有序的,即消息发送到队列的顺序跟消费时拉取到的顺序是一致的。
- replication:副本,一个topic对应的分区partition可以有多个副本,多个副本中只有一个为leader,其余的为follower。为了保证数据的高可用性,leader和follower会尽量均匀的分布在各个broker中,避免了leader所在的服务器宕机而导致topic不可用的问题。
- leader:多个副本的主副本,生产者发送的数据和消费者消费的数据都是通过leader进行处理的。
- follower:多个副本中除了leader副本,其余的均为follower副本,也即从副本。从副本不会和生产者和消费者提供服务,而是实时同步主副本的数据。当主副本宕机后,通过一定算法选举出新的从副本成为主副本,继续为生产者和消费者提供服务。
- zookeeper:负责统筹管理,记录有哪些brokers、谁是leader、辅助选举leader等。
那么,根据这个架构,衍生出N个子问题:
- kafka消息获取方式
- kafka文件存储方式
- 如何保证数据高可靠、不丢失
- 如何保证数据不重复
- 如何处理消息堆积
- ……
1、kafka消息获取方式
(1)生产者
producer采用push(推)模式向broker中写入数据。
pull (拉)模式需要kafka集群事先知晓producer的信息,即producer需要先注册后才能使用。当有生产者实例宕机了,可能会存在空等。
push(推)模式的优点是生产者有数据就塞给消息队列,不用管其他的事情,只用专注于生产数据。
(2)消费者
consumer采用pull(拉) 模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull (拉)模式则可以根据consumer的消费能力以适当的速率消费消息。不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。 针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
那么消费者是如何知道生产者发送了数据呢?换一句话来说就是,消费者什么时候pull数据呢? 其实生产者产生的数据消费者是不知道的,consumer采用轮询的方式定期去broker中进行数据检索,如果有数据就用来消费,如果没有就继续轮询等待。
2、kafka文件存储方式
(1)存储机制
Kafka中topic是逻辑概念,但是分区partition是物理概念,生产者发送的数据都是存储在partition中的,每个partition中都有一个log文件,生产者生产的数据都会追加到该log文件的末端。
为防止log文件过大导致数据检索过慢,kafka将log文件进行切片,每个片称为segment,每个segment中包括'.index'文件,'.log'文件和'.timeindex'等文件。
- log文件:保存实际数据的文件。
- index文件:偏移量索引文件,文件名为segment第一个数据offset值。
- timeindex文件:时间戳索引文件,文件名为segment第一条数据的offset值。
(2)清除策略
Kafka提供的文件删除策略有delete和compact两种。
- delete策略:将过期数据删除
- cleanup.plocy = delete 所有数据启用删除策略
- 基于时间删除,默认打开。以segment中所有记录中最后记录的时间戳作为该文件的时间戳。
- 基于大小删除,默认关闭。超过设置的所有日志总大小,删除最早的segment。
- cleanup.plocy = delete 所有数据启用删除策略
- compact日志压缩:对于相同key的不同value值,只保留最后一个value值。
3、如何保证数据高可靠、不丢失
数据的可靠性是指producer发送数据到kafka,收到kafka应答后,该数据能成功落盘,那么这次发送是可靠的。但是,为了适应不用的应用场景以及实现高可用,kafka会将数据备份到不同的副本当中,在数据同步的过程中如果出现的故障,那么就有可能出现数据丢失、重复的情况。想要弄清楚kafka数据可靠性,就必须先要了解kafka中ACK的应答原理。
ACK应答是指在leader分区接收到生产者的数据后,何时对生产者做出应答的策略。ACK可选的值有0,1,-1三个,可以在生产者的配置项 acks 中设置,ACK设置不同,对生产者做出应答的时机也不同。
- ACK=0:可靠性级别最低。leader收到生产者数据后不需要等数据落盘,立即对生产者做出应答。生产者收到应答后认为leader已成功接收数据,因此不需要再发当前数据了。但是,如果leader在将内存中的数据落盘时突然出现故障,那么这条数据因为没有保存到磁盘中而导致数据的丢失。
- ACK=1:可靠性级别较高。leader收到生产者的数据并将数据落盘后,对生产者做出应答。生产者收到应答后继续发送其他数据。如果leader做出应答并且follower未同步到该数据时,leader出现故障,kafka会重新在follower中选出新的leader,而新的leader没有同步到数据,生产者也不会再发该数据,因此导致该数据的丢失。
- ACK=-1(all):可靠性级别最高,kafka的acks默认值。leader收到数据并落盘,并且确认所有follower收到数据后再给生产者应答。此时,所有分区副本都有该数据了,即使任意分区出现故障,数据仍然是完整的。
4、如何保证数据不重复
在使用kafka时为了保证数据的高可靠性,我们一般都会将应答级别设置为-1(all),即leader会等follower均收到数据后再应答。如果leader在收到所有的follower的确认后发生故障,此时所有分区副本都已收到数据,但是生产者没有收到应答,认为leader没有收到生产者的发送请求,于是尝试重新发送。由于leader发生故障,kafka重新选举leader,生产者将数据再一次发送到新的leader上,造成数据重复。
kafka引入幂等性和事务,解决数据重复的问题。
- 幂等性:是指无论producer发送多少条重复的数据,broker端都只会持久化一条数据,保证数据不重复。为此,Kafka引入两个变量:
- Producer ID(PID):每个Producer在初始化的时候会被分配一个唯一的PID,对用户是不可见的
- Sequence Number:对于每个PID、每个分区Partition,都对应一个从0开始单调递增的Sequence Number。
- 实现逻辑:Broker缓存Sequence Number。对于接收的每条消息,如果其序号比Broker缓存的序号大1,则接受它,否则将其丢弃。这样就实现了数据不重复。
- 限制:引入Sequence Number实现幂等只针对<PID,分区>,也就是说,kafka幂等只保证单个生产者单分区的幂等。
- 事务:幂等性并不能跨多个分区运行,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在不一致的情况。kafka的事务需要跟幂等性配合使用,开启事务就必须开始幂等性。
5、如何处理消息堆积
消息堆积是指broker中保存大量未消费的消息。若长时间未消费且触发了删除策略,那么这部分数据就会丢失。可能造成数据积压的原因有消费者的处理能力不足,分区数较少。应对策略,可以适当增加消费者数量分区数量,使分区数量等于消费者数量,也可以把消费者每次拉取数据的大小适当上调。
至此,对于Kafka的工作流程有了基本的认识。Kafka涉及的内容还有很多,值得进一步深入学习。
接下来,按照官网的教程,开启实操模式
五、Kafka服务端
1、下载、解压Kafka
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
2、启动Kafka(要求Java 8+)
启动ZooKeeper
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动KafkaServer
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
3、创建topic
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
4、生产消息
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
5、消费消息
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
至此,通过命令行,启动了一个单点的Kafka服务。对于本地开发,自己搞个单点服务,足矣~至于测试、生产的服务,还得请高人运维。
接下来,结合开发语言,来看看在node中该如何使用Kafka
六、Kafka客户端
1、安装Kafkajs
npm install kafkajs
2、创建客户端实例
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
})
3、生产消息
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
await producer.disconnect()
4、消费消息
至此,终于可以在代码里将Kafka跑起来了。
七、说在最后
初识Kafka,从一脸懵圈儿,到似懂非懂,记录一下学习过程,道阻且长,行则将至...