本文主要介绍消息队列 Kafka 发布者的最佳实践,从而帮助您更好的使用该产品。
文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。
Kafka 发送示例代码片段
Key 和 Value
Kafka 0.10.0.0 的消息字段只有两个:Key 和 Value。Key 是消息的标识,Value 即消息内容。为了便于追踪,重要消息最好都设置一个唯一的 Key。通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。
失败重试
在分布式环境下,由于网络等原因,偶尔的发送失败是常见的。导致这种失败的原因有可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。
消息队列 Kafka 是 VIP 网络架构,会主动掐掉空闲连接(30 秒没活动),也就是说,不是一直活跃的客户端会经常收到 “connection rest by peer” 这样的错误,因此建议都考虑重试消息发送。
异步发送
发送接口是异步的;如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
线程安全
Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,一个应用对应一个 Producer 就足够了。
Acks
Acks的说明如下:
- acks=0,表示无需服务端的 Response,性能较高,丢数据风险较大;
- acks=1,服务端主节点写成功即返回 Response,性能中等,丢数据风险中等,主节点宕机可能导致数据丢失;
- acks=all,服务端主节点写成功,且备节点同步成功,才返回 Response,性能较差,数据较为安全,主节点和备节点都宕机才会导致数据丢失。
一般建议选择 acks=1,重要的服务可以设置 acks=all。
Batch
Batch 的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka 通过 Batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。
在构建 Producer 时,需要考虑以下两个参数:
- batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)达到这个数值时,就会触发一次网络请求,然后客户端把消息真正发往服务器;
- linger.ms : 每条消息待在缓存中的最长时间。若超过这个时间,就会忽略 batch.size 的限制,然后客户端立即把消息发往服务器。
由此可见,Kafka 客户端什么时候把消息真正发往服务器,是通过上面两个参数共同决定的。batch.size 有助于提高吞吐,linger.ms有助于控制延迟。您可以根据具体业务需求进行调整。
OOM
结合 Kafka 的 Batch 设计思路,Kafka 会缓存消息并打包发送,如果缓存太多,则有可能造成 OOM(Out of Memory)。
- buffer.memory : 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略 batch.size 和 linger.ms 的限制。
- buffer.memory 的默认数值是 32 MB,对于单个 Producer 来说,可以保证足够的性能。需要注意的是,如果你在同一个 JVM 中启动多个 Producer,那么每个 Producer 都有可能占用 32 MB 缓存空间,此时便有可能触发 OOM。
在生产时,一般没有必要启动多个 Producer;如果特殊情况需要,则需要考虑buffer.memory的大小,避免触发 OOM。