searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享

Kafka如何保证消息顺序性的实际应用

2023-11-03 06:27:33
14
0
  • kafka的设计

Kafka将消息以topic为单位进行归纳将向Kafka topic发布消息的程序成为producers,将预订topics并消费消息的程序成为consumer,Kafka通常以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker,broker中分不同的topic,每个topic有不同的partition。Kafka 支持多个消费者从一个单独的消息流中读取数据,并且消费者之间互不影响。这与其他队列系统不同,其他队列系统一旦被客户端读取,其他客户端就不能再读取它。并且多个消费者可以组成一个消费者组,他们共享一个消息流,并保证消费者组对每个给定的消息只消费一次。

 

  • kafka如何保证消息的顺序性

首先kafka默认存储和消费消息是不保证顺序性的,因为存储在同一个topic下的数据可能在不同的分区中,每个分区都有一个按照顺序的offset,消费者关联了多个分区,每个分区维护了自己的offset,故不能保证消息的顺序性。

如上所述,消费存储在不同partition不能保证,可以只提供一个分区,或者相同的业务只在一个分区进行存储和消费,一个分区的偏移量是顺序性的 ,存储在一个partition就可以保证producer的消息在kafka中存储就是顺序的;

所以,producer端如下提供两种方式:

  • producer发送消息时指定分区号

设置分区参数,把有顺序要求的消息放在同一个分区即可

kafkaTemplate.send("cnc.vpc.port", 0, "<network-id>", "<value>");

  • 相同的业务key

key决定消息存储在哪个分区,默认情况下,Kafka会计算key的hashcode值,根据hash值找到不同分区进行存储。如果要求有顺序性,可以设置同一个key,同一个key的hash值是一样的,最终会存储到同一个分区中。

kafkaTemplate.send("cnc.vpc.port", "<network-id>", "<value>");

如我们创建的port业务是vpc粒度的,不同的业务不同的topic,我们发送kafka消息时提供network_id的key,还有在消息header中增加timestamp时间戳,保证当前业务在分区的顺序是可以保证的。

 

对于消费端而言,也需要分情况讨论。
1、如果是单线程消费,那么所消费到的所有消息都是顺序的,不需要做什么额外的处理,但是这种消费方式往往消费速率跟不上,导致消息积压。
2、如果是多线程消费,比如经典的拿到消息之后丢入线程池,这种方式呢显然就无法保证消息的有序性了。那么我们就需要思考一下如何让一个分区的消息只被一个线程消费呢?一种简单的实现方式就是使用内存队列 —— 将消费出来的消息,根据一些策略丢入对应的内存队列,队列的下游再用单线程的方式从队列中拉取数据进行消费。最基本的策略有:

基于 record.partition() 拿到对应的分区 ID,然后丢入对应的内存队列。

基于 record.key() 拿到上游设置的 key,进行 hash 后丢入对应的内存队列。

0条评论
0 / 1000
taketomoon
2文章数
0粉丝数
taketomoon
2 文章 | 0 粉丝
taketomoon
2文章数
0粉丝数
taketomoon
2 文章 | 0 粉丝

Kafka如何保证消息顺序性的实际应用

2023-11-03 06:27:33
14
0
  • kafka的设计

Kafka将消息以topic为单位进行归纳将向Kafka topic发布消息的程序成为producers,将预订topics并消费消息的程序成为consumer,Kafka通常以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker,broker中分不同的topic,每个topic有不同的partition。Kafka 支持多个消费者从一个单独的消息流中读取数据,并且消费者之间互不影响。这与其他队列系统不同,其他队列系统一旦被客户端读取,其他客户端就不能再读取它。并且多个消费者可以组成一个消费者组,他们共享一个消息流,并保证消费者组对每个给定的消息只消费一次。

 

  • kafka如何保证消息的顺序性

首先kafka默认存储和消费消息是不保证顺序性的,因为存储在同一个topic下的数据可能在不同的分区中,每个分区都有一个按照顺序的offset,消费者关联了多个分区,每个分区维护了自己的offset,故不能保证消息的顺序性。

如上所述,消费存储在不同partition不能保证,可以只提供一个分区,或者相同的业务只在一个分区进行存储和消费,一个分区的偏移量是顺序性的 ,存储在一个partition就可以保证producer的消息在kafka中存储就是顺序的;

所以,producer端如下提供两种方式:

  • producer发送消息时指定分区号

设置分区参数,把有顺序要求的消息放在同一个分区即可

kafkaTemplate.send("cnc.vpc.port", 0, "<network-id>", "<value>");

  • 相同的业务key

key决定消息存储在哪个分区,默认情况下,Kafka会计算key的hashcode值,根据hash值找到不同分区进行存储。如果要求有顺序性,可以设置同一个key,同一个key的hash值是一样的,最终会存储到同一个分区中。

kafkaTemplate.send("cnc.vpc.port", "<network-id>", "<value>");

如我们创建的port业务是vpc粒度的,不同的业务不同的topic,我们发送kafka消息时提供network_id的key,还有在消息header中增加timestamp时间戳,保证当前业务在分区的顺序是可以保证的。

 

对于消费端而言,也需要分情况讨论。
1、如果是单线程消费,那么所消费到的所有消息都是顺序的,不需要做什么额外的处理,但是这种消费方式往往消费速率跟不上,导致消息积压。
2、如果是多线程消费,比如经典的拿到消息之后丢入线程池,这种方式呢显然就无法保证消息的有序性了。那么我们就需要思考一下如何让一个分区的消息只被一个线程消费呢?一种简单的实现方式就是使用内存队列 —— 将消费出来的消息,根据一些策略丢入对应的内存队列,队列的下游再用单线程的方式从队列中拉取数据进行消费。最基本的策略有:

基于 record.partition() 拿到对应的分区 ID,然后丢入对应的内存队列。

基于 record.key() 拿到上游设置的 key,进行 hash 后丢入对应的内存队列。

文章来自个人专栏
kafka消息顺序性实际应用
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0