本文将通过概念功能特性、原理,关键配置、最佳实践四个点层层递进的介绍,为大家梳理kafka的最佳使用方式及其原因。
概念和功能特性
我们了解一个产品的时候,首先要知道这个产品是为了解决什么问题出现的
Kafka首先是一个消息组件,消息组件一开始是为了解决系统间繁杂的调用,简化开发工作量而诞生的
比如订单状态的变更,风控通知等,如果每个触发点都需要去调用接口,那么接口调用方将在每增加一个通知方的时候增加一个系统调用,增加开发的人力和沟通成本,降低开发效率。
消息组件只需要发送一次通知,关心它的系统去监听这个通知就可以了。
在市面上已经有了一些消息组件的情况下,Kafka为什么又出现了呢?因为kafka一开始是为了解决用户日志信息的简单快速的记录,给日志打点的前端调用方和后端存储方,提供统一的交互口,避免了不同前端系统维护不同后端系统的繁杂操作。
并且,由于日志系统对速度要求高,对事务性要求低,当时市面上的消息组件在速度和并发上并不能达到要求,zeromq这种速度符合要求的又功能太简单,需要二次开发。因而kafka就诞生了
虽然是为日志消息开发出来的,但是经过一段时间的发展之后,kafka不仅仅能够作为日志消息组件,还能够作为一个通用的消息和流组件,下面将从产品目标、同类产品、功能特性三方面介绍kafka的概念和功能特性。
同类产品
ZeroMQ:本身就不是一个消息队列服务器,更像是一组底层网络通讯库,对原有的Socket API上加上一层封装而已。
DDMQ、TubeMQ、QMQ都是后起之秀,基本上可以看做是在ActiveMQ、RabbitMQ、Kafka基础或者思想上面的改造版本
这里我重点给大家对比下ActiveMQ、RabbitMQ、RocketMQ、Kafka四个消息组件
这里大家可以看到RocketMQ、kafka的性能比ActiveMQ、RabbitMQ的性能高一个数量级
这里大家可以看到RocketMQ、ActiveMQ、Pulsar支持2pc事务,rabbitMQ不支持事务
Kafka的事务是,通过幂等性来保障消息的exactly-once的事务,不能用来消息发送,业务处理在一个事务里面处理
我们平时选择开源产品的时候,不仅仅需要关注它的性能,也需要关注它的开源相关的点
可以看到在社区活跃度和产品成熟度上面ActiveMQ、RabbitMQ、Kafka、Pulsar都比RocketMQ要高
概念和功能特性
除了前面介绍的消息组件需要具备的通用性功能,Kafka还有什么特别的功能特性呢?
功能特性
- 消费者主动拉对应序列位置的数据
- 优点:消息队列背压、削峰填谷。
- 缺点: 消费者就要循环检测, 使用空轮询的繁忙检测方式等候数据到来。
- 空轮询:我们可以设置拉请求的参数,允许消费者请求在“长轮询”时阻塞,直到数据到达
- 消费者所属的消费者组有一个独立的brocker维护消费的最新位置
- 三种消息的发送保障机制:
- 最多一次(处理之前就提交位移信息)
- 至少一次(处理之后再交位移信息)
- 有且仅有一次(处理之后再交位移信息,并且定义业务的id来判断是否处理过)。
- 提供消息备份机制:
- 一条消息只有在它在所有的同步副本集的日志分区都已经提交了, 才被当作是”已提交”. 只有已经提交的消息才会分发给消费者。
- 生产者可以在延迟和持久性中, 决定是否等待消息提交. 这个在生产者中的反馈配置项中可以设置。
- 只要同步的复制集中有一个节点是一直都是存活的,kafka就可保证已经提交的数据不会丢失,
原理
Kafka是怎么做到以上这些功能的呢?以及怎么配置可以让kafka更符合我们的使用场景呢,如果无法衡量,就无法改善,所以想知道这些,我们就需要了解这个产品背后的设计原理。
下面我将从kafka整体架构、消息发送、消息消费、失效检测四个部分来介绍它的原理
分布式特性
- 0.9版本之前zookeeper协调管理消费组。
- 每个 消费者被创建时会触发消费组所有消费者的重新调整。
- 容易存在脑裂问题、羊群效应,调整结果不可控。
- 0.9版本后使用kafka的brocker作为coordinater管理消费组。
因为Kafka 的 Consumer Rebalance 的控制策略是由每一个 消费者 通过在 Zookeeper 上注册 Watch 完成的。每个 Consumer 只负责调整自己所消费的 Partition,为了保证整个消费组的一致性,当一个消费者触发了 Rebalance 时,该 Consumer Group 内的其它所有其它 Consumer 也应该同时触发 Rebalance。
该方式有如下缺陷:
Herd effect任何 Broker 或者 Consumer 的增减都会触发所有的 Consumer 的 Rebalance
Split Brain每个 Consumer 分别单独通过 Zookeeper 判断哪些 Broker 和 Consumer 宕机了,那么不同 Consumer在同一时刻从Zookeeper“看”到的View 就可能不一样,这是由Zookeeper 的特性决定的,这就会造成不正确的Reblance 尝试。
调整结果不可控所有的 Consumer 都并不知道其它 Consumer 的 Rebalance 是否成功,这可能会导致 Kafka工作在一个不正确的状态。
分布式特性
我们讲分布式的时候必定会带有两个概念:主节点、副本节点,如图所示。
kafka还带了一个isr节点
- 主节点: 是负责给定分区的所有读取和写入的节点。从isr节点选择。
- 副本: 是为该分区复制日志的节点列表,无论它们是引导者还是当前处于活动状态。
- Isr: 是同步副本的集合。是副本列表的子集,当前仍处于活动状态并追随领导者。
- 节点必须能够和zookeeper机器或者coordinate建立心跳信号。
- 不能落下太远,replica.lag.time.max.ms。
- 一个复制节点重新加入到ISR集合前, 必须完全同步上。
- 主节点宕机的时候会从ISR中选择一个作为leader。
分区中的所有副本统称为AR(Assigned Repllicas)。所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。消息会先发送到leader,然后follower副本才能从leader中拉取消息进行同步,同步期间内follower副本相对于leader而言会有一定程度的滞后。前面所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本,组成OSR(Out-Sync Relipcas),由此可见:AR=ISR+OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。
- 为什么kafka需要用到isr而不是像其他的分布式系统一样通过全局的机器的多数投票来保证主节点可用呢?
全局的多数投票策略:一般需要一半以上的机器存活来保证业务。要容忍n个机器错误,就需要2n+1个机器,宽带和存储存在浪费,适用于数据实时性和一致性要求高的场景(尤其是那些非大量存储的场景)。
Kafka使用isr的投票策略,只需要存在一定数量的isr机器就可以确保主节点选举和查询,
Elasticsearch也有类似的策略,不过一般设置成n+1为了防止脑裂的发生。
- 为什么kafka就不怕脑裂呢?
分布式选举一般有两种模式
- 有中心协调节点,
- 优点:选举结果统一,不会出现羊群效应和脑裂问题。
- 缺点:需要给中心节点做高可用。
- 无中心协调节点、比如:elasticsearch。
0.9版本之后的kafka是有中心协调节点。
- 那么在主节点和ISR中的所有副本集都宕机的时候我们怎么处理?
- 使用一个潜在的非一致性的副本
- 如果我们更期望是不可用状态而不是不一致状态时, 这个特性可以通过配置unclean.leader.election.enable来禁用。
消息发送
知道了整体架构之后,我们来看看分别看看消息发送和消息消费具体是怎么做的,为什么能够这么快
先看消息发送部分
这里大家就可以知道为什么消息发送能够有这么快的速度了,因为都是往文件尾部顺序的写入数据,复杂度为O(1)
- 文件顺序存储(partation内部有序、不保证全局有序),保证顺序读写的高效性。
- 生产者可以指定发送消息到具体分区。
- 该功能可以用来保证属于类别id消息的顺序性。
数据存储都有一个共性问题:数据写进去之后,什么时候可以被消费者读取到,这里面有个水位的概念
HW俗称高水位,是HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。
为什么要这样做呢?
这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。
消息消费
消息消费有三个问题要解决:
1、怎么能做到迅速的读取数据?
2、怎么知道要消费的分区?
3、怎么避免和同一个组的其他消费者消费同一个分区呢?
首先我们知道消息组件的一个很大的特点是及时消息消费,它所消费的消息都是顺序性消费,并且基本都是最新的数据,所以就能够用到操作系统的page cache、sendfile(zero-copy)(使消费者消费消息的速度达到网络连接的速度)
sendfile: 数据只被拷贝到页缓存一次, 并被多次消费, 而不是缓存到(用户空间的)内存中, 然后在每次消费时拷贝到系统内核空间中. 这可以使消费者消费消息的速度达到网络连接的速度
传统数据读取: 4次拷贝还有2次系统调用,1、操作系统从磁盘读取数据到系统内核空间的缓存页中。2、应用从内核空间读取数据到用户空间缓冲区中。3、应用从把数据写回到内核空间的socket缓冲区中。4、系统拷贝socket缓冲区的数据到网卡缓冲区, 然后由网卡发送数据到网络中。
那么消费者怎么知道他要消费的分区呢?怎么避免和同一个组的其他消费者消费同一个分区呢?
当消费者想要加入组时,它将向组协调器发送JoinGroup请求。加入该小组的第一个消费者成为小组负责人。负责人从组协调器接收组中所有消费者的列表(这将包括最近发送心跳并因此被认为是活动的所有消费者),并负责为每个消费者分配分区的子集。它使用PartitionAssignor的实现来决定哪个分区应由哪个使用者处理。
那么消费者知道他要消费的分区,怎么知道应该从哪一个偏移量去消费呢?
- 最后提交偏移量:消费者向服务器提交的已成功处理的消息的偏移量。
- 首次创建使用消费者组时,将根据auto.offset.reset配置设置所定义的策略来设置初始偏移量。
- 消费者崩溃后新的消费者从该位置开始消费。
- 日志结束偏移量:写入日志的最后一条消息的偏移量。
- 高水位标记:已成功复制到所有isr的最后一条消息的偏移量。消费者只能读到最高水印。
- 这样可以防止使用者读取未复制的数据,这些数据以后可能会丢失。
- 0.9版本之前偏移量是使用zookeeper管理消费组记录的。
- 0.9版本之后偏移量是使用kafka的brocker作为协调器管理消费组记录的。
分布式系统最核心的一个点就是节点的管控,尤其是节点的活性处理,那么kafka是怎么样进行失效检测的呢?
消息消费
1,一个分区只能被同一个消费组的一个消费者消费。
2,同一个分组的消费者的数量大于分区数量的时候,有消费者是空闲状态。
3,不同消费组的消费者可以消费同一个分区。
失效检测
由Kafka内置实现了失败检测和Rebalance(ZKRebalancerListener),但是它存在羊群效应和脑裂的问题,客户端代码实现低级API也不能解决这个问题。如果将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,这两个问题即可解决。同时还可大大减少Zookeeper的负载,有利于Kafka Broker的扩展(Broker也会作为协调节点的角色存在)。
0.9版本将消费者的管理从zookeeper转到了kafka的brocker。
0.10.1版本将心跳的触发由poll转为单独的心跳线程。
- 调用poll()之间的最大延迟:max.poll.interval.ms,
- 控制拉取下来的记录的最大处理时间。
- 0.10.1版本之前,如果真实处理时间大于session.timeout.ms,则会触发服务端协调器对于该消费组中其他消费者与分区之间对应关系的重新平衡。
- 0.10.1版本之后,kafka引入了单独的心跳线程。