1. 体系结构概述
Kafka主题用于对记录进行组织。记录由生产者生成,由消费者使用。生产者将记录发送到 Kafka 代理,后者存储数据。
主题跨代理对记录进行分区。在使用记录时,每个分区最多可使用一个消费者来实现数据并行处理。
复制用于在节点之间复制分区。这可以防止节点(代理)发生服务中断。将副本组之间的单个分区指定为分区领导者。生产者流量将根据由ZooKeeper管理的状态路由到每个节点的领导者。
2. 调优方案选择
Kafka性能体现在两个主要方面:吞吐量和延迟。吞吐量是指数据的最大处理速率,通常吞吐量越高越好。延迟是指存储或检索数据所花费的时间。通常,延迟越低越好。在吞吐量、延迟和应用基础结构的开销方面找到适当的平衡可能会有难度。
根据追求的是高吞吐量、低延迟还是此两者,性能要求可能符合以下三种常见情况中的一种:
- 高吞吐量,低延迟。此方案要求同时满足高吞吐量和低延迟(大约100毫秒)。服务可用性监视就是这种应用场景的一个例子。
- 高吞吐量,高延迟。此方案要求满足高吞吐量(大约1.5 GBps),但可以容许较高的延迟(< 250 毫秒)。这种应用场景的一个例子是引入遥测数据进行近实时的处理,例如安全与入侵检测应用程序。
- 低吞吐量,低延迟。此方案要求提供低延迟(< 10毫秒)以完成实时处理,但可以容许较低的吞吐量。在线拼写和语法检查就是这种应用场景的一个例子。
3. 生产者配置优化
以下部分重点介绍一些用于优化 Kafka 生产者性能的最重要通用配置属性。
批大小
Kafka生产者将作为一个单元发送的消息组(称为批)汇编到一起,以将其存储在单个存储分区中。批大小表示在传输该组之前必须达到的字节数。增大 batch.size 参数可以提高吞吐量,因为这可以降低网络和 IO 请求的处理开销。负载较轻时,增大批大小可能会增大 Kafka 发送延迟,因为生产者需要等待某个批准备就绪。负载较重时,建议增大批大小以改善吞吐量和延迟。
生产者确认参数
生产者所需的acks 配置确定在将某个写入请求视为已完成之前,分区领先者所需的确认数目。此设置会影响数据可靠性,其值为 0、1 或 -1。值 -1 表示必须收到所有副本的确认,才能将写入请求视为已完成。设置 acks = -1 能够更可靠地保证数据不会丢失,但同时也会导致延迟增大,吞吐量降低。如果应用场景要求提供较高的吞吐量,请尝试设置 acks = 0 或 acks = 1。请记住,不确认所有副本可能会降低数据可靠性。
压缩
可将Kafka生产者配置为先压缩消息,然后再将消息发送到代理。compression.type 设置指定要使用的压缩编解码器。支持的压缩编解码器为“gzip”、“snappy”和“lz4”。如果磁盘容量存在限制,则压缩是有利的做法,应予以考虑。
在 gzip 和 snappy 这两个常用的压缩编解码器中,gzip 的压缩率更高,它可以降低磁盘用量,但代价是使 CPU 负载升高。snappy 编解码器的压缩率更低,但造成的 CPU 开销更低。可以根据代理磁盘或生产者的 CPU 限制来决定使用哪个编解码器。gzip 数据压缩率比 snappy 高 5 倍。
使用数据压缩会增加磁盘中可存储的记录数。如果生产者与代理使用的压缩格式不匹配,则数据压缩也会增大CPU开销。因为数据在发送之前必须经过压缩,并在处理之前经过解压缩。
4. 代理设置
以下部分重点介绍一些用于优化 Kafka 代理性能的最重要设置。
磁盘数目
存储磁盘的 IOPS(每秒输入/输出操作次数)和每秒读/写字节数有限制。创建新分区时,Kafka 会将每个新分区存储在现有分区最少的磁盘上,以便在可用磁盘之间平衡分区的数目。尽管有存储策略进行调节,但在处理每个磁盘上的数百个分区副本时,Kafka 很容易就会使可用磁盘吞吐量达到饱和。此时,需要在吞吐量与成本之间进行取舍。如果应用场景需要更大的吞吐量,请创建一个可为每个代理提供更多托管磁盘的群集。
主题和分区的数目
Kafka生产者将写入主题。Kafka 消费者读取主题。主题与日志相关联,该日志是磁盘上的数据结构。Kafka 将生产者中的记录追加到主题日志的末尾。主题日志包括分散在多个文件之间的多个分区。而这些文件又分散在多个 Kafka 群集节点之间。消费者可以按照自己的节奏从 Kafka 主题中读取内容,并可以在主题日志中选择自己的位置(偏移)。
每个 Kafka 分区是在系统上的一个日志文件,生产者线程可以同时写入到多个日志。同样,由于每个消费者线程从一个分区读取消息,因此也能并行处理从多个分区使用消息的操作。
提高分区密度(每个代理的分区数)会增大与元数据操作以及每个分区领先者及其后继者之间的分区请求/响应相关的开销。即使不存在流动的数据,分区副本也仍会从领先者提取数据,导致需要通过网络额外处理发送和接收请求。
对于 HDInsight 中的 Apache Kafka 群集 2.1 和 2.4 以及更高版本,我们建议最多为每个代理提供 2000 个分区(包括副本)。增加每个代理的分区数会降低吞吐量,并可能导致主题不可用。
副本数
较高的复制因子会导致分区领先者与后继者之间的请求数增加。因而,较高的复制因子会消耗更多的磁盘和CPU来处理额外的请求,并增大写入延迟,降低吞吐量。
我们建议对Azure HDInsight 中的 Kafka 至少使用 3 倍的复制因子。大部分 Azure 区域有三个容错域。在只有两个容错域的区域中,用户应使用 4 倍复制因子。
5. 消费者配置
以下部分重点介绍一些用于优化 Kafka 消费者性能的最重要通用配置属性。
消费者数量
良好的做法是让分区数量等于消费者数量。如果消费者数量小于分区数量,则少数消费者将从多个分区读取数据,从而增大消费者延迟。
如果消费者数量大于分区数量,则会浪费消费者资源,因为这些消费者将处于空闲状态。
避免频繁进行消费者重新平衡
发生分区所有权更改(即消费者横向扩展或纵向缩减)、中介崩溃(因为中介是消费者组的组协调者)、消费者崩溃、添加新主题或添加新分区时,会触发消费者重新平衡。在重新平衡期间,消费者无法使用,因此会增大延迟。
如果消费者可以在session.timeout.ms 内向中介发送检测信号,则认为消费者是存活的。否则,认为消费者已消亡或有故障。这会导致消费者重新平衡。消费者的 session.timeout.ms 越低,我们检测到这些故障的速度就越快。
如果session.timeout.ms 太低,消费者可能会由于某些情况(例如,处理一批消息花费了较长时间,或 JVM GC 暂停时间太长)而遇到重复的不必要重新平衡。如果你的某个消费者花费了过多时间处理消息,你可以通过以下方法解决此问题:使用 max.poll.interval.ms 提高消费者在获取更多记录之前可以保持空闲状态的时长上限,或使用配置参数 max.poll.records 减小返回的最大批大小。
批处理
与生产者一样,可为消费者添加批处理。可以通过更改配置fetch.min.bytes,来配置消费者在每个提取请求中可获取的数据量。该参数定义了消费者提取响应中预期的最小字节数。增大此值会减少对中介发出的提取请求数量,从而降低额外的开销。默认情况下,此值为 1。类似地,还有另一个配置 fetch.max.wait.ms。如果提取请求没有足够的消息(根据 fetch.min.bytes 的大小),它将等待基于配置 fetch.max.wait.ms 的等待时间过期。
注意在少数情况下,当消费者无法处理消息时,它看起来就很缓慢。如果在发生异常后你不提交偏移量,消费者将停滞在无限循环中的特定偏移量而不会前进,从而增大消费者端的滞后时间。
6. Linux OS 优化
以下部分重点介绍一些用于优化linux OS 的最重要通用配置属性。
内存映射
vm.max_map_count 定义了进程可以拥有的最大mmap 数量。默认情况下,在 HDInsight Apache Kafka 群集 Linux VM 上,该值为 65535。
在Kafka 中,每个日志段需要一对 index/timeindex 文件,其中每个文件消耗 1 个 mmap。也就是说,每个日志段使用 2 个 mmap。因此,如果每个分区托管一个日志段,则该分区至少需要 2 个 mmap。每个分区的日志段数取决于段大小、负载强度、保留策略和滚动期,该数量通常大于 1。
Mmap value = 2 ((partition size)/(segment size))*(partitions)
如果所需的mmap 值超过 vm.max_map_count,中介将引发“映射失败”异常。
为避免此异常,请使用以下命令检查VM中 mmap 的大小,并根据需要在每个工作器节点上增大该大小。
# command to find number of index files:
find . -name '*index' | wc -l
# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l
# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>
# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p
注意请注意不要将此值设置得过高,因为这会占用 VM 上的内存。通过设置 MaxDirectMemory 来确定 JVM 可以在内存映射中使用的内存量。默认值为 64MB。有可能会达到此限制。可以通过 Ambari 将 -XX:MaxDirectMemorySize=amount of memory used 添加到 JVM 设置来增大此值。需要知道节点上正在使用的内存量,以及是否有足够的可用 RAM 来支持增大此值。