RocketMQ 消费自动负载均衡机制
1. 背景介绍
RocketMQ是一款具备低延时、高性能的消息队列产品。 通过namesrv这个功能简单的组件,将松散的不同broker连起来,组成一个分布式的消息队列集群。本文是基于4.9.4版本的RocketMQ进行分析。
1.1. 消费模式
RocketMQ有两种消费模式:集群消费和广播消费
- 集群消费模式: 一个消息只被订阅组内的一个消费者实例消费。不同消费者实例互相分摊消息,实现消费负载均衡
- 广播消费模式: 一个消费会被订阅组内的所有消费者实例消费。
要实现消费的自动负载均衡,只能使用集群消费模式,也是强烈推荐使用的消费模式。
1.2 队列(queue)
消息都存放在主题(topic)上。而一个主题可以有多个队列(queue)。同一个主题,可以分布在不同的broker上。
所以主题的队列数,是主题在不同broker上的队列数总和。
broker往namesrv上注册所有主题的信息。这样客户端通过namesrv可以看到主题的全貌。
而队列的主要作用是解决客户端的并发消费。
2. 消费负载均衡的功能介绍
单个消费进程是有性能的上限的。不同的业务场景,消耗的时间不同。短则毫秒级,长则可能几秒甚至几分钟。 为了提高消费的性能,需要可以并行去消费消息,而且并行消费期间,消费者实例的消费进度和运行情况应该不能互相干扰。
为了解决这个问题,需要实现消费端的自动负载均衡。 上一节有提到,队列正是解决这个问题的关键。
下面的JSON是订阅组sub_1保存topic_1主题的消费进度的文件保存信息。可见,消费进度,其实正是具体每个队列的消息位置(offset)
- rocketmq内部使用JSON格式保存所有的消费进度,一般是放在数据存储目录下config/consumerOffset.json 文件里面
"topic_1@sub_1":{0:181,1:178,2:176,3:183,4:184,5:182,6:177,7:179}
为了不互相干扰也可以并发去消费,客户端需要分配queue的订阅,让每个客户端都去订阅不同的queue,没有重叠,也没有漏了。 达到如下的效果:
- 客户端的数量比队列数多的时候。超出的客户端会无法订阅、消费任何消息。直到正在订阅的客户端停止。
3. 消费负载均衡的原理分析
结合客户端的源代码进行。
其中核心代码如下:
1. RebalanceService.java: 客户端自带的服务,默认每20秒进行一次doRebalance();
2. RebalanceImpl.java: 针对每个订阅组的每个主题,进行重新负载均衡
3. MQClientInstance.java: 客户端对象,默认每30秒从namesrv更新主题的路由信息。
4. AllocateMessageQueueAveragely.java: 负载均衡策略
核心的算法如下:
1. 客户端启动的时候,会从namesrv获取订阅主题的路由信息。后续默认每30秒更新一次
2. 客户端默认每20秒进行重新负载均衡计算。
2.1. 消费的负载均衡计算,本质是将每个订阅组里面的每个主题都进行负载计算
2.2. 单个订阅组的主题的负载计算方法如下: 基于步骤1得到主题的所有队列信息, 以及从服务端获取所有客户端ID列表 。 将两者排序,确保每个客户端都得到相同的结果。 再基于负载均衡策略进行计算。
2.3 负载均衡的策略:默认使用 AllocateMessageQueueAveragely策略,也即是上面例子的介绍按顺序平均分配队列的方式。
* 可以根据需要设置不同的策略,比如一致性HASH等。但要确保每个客户端都要有相同的配置。
下面的相关的代码。
RebalanceService.java:
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
RebalanceImpl.doRebalance(): 对所有订阅组订阅的所有TOPIC进行重新负载计算
RebalanceImpl.rebalanceByTopic(): 对具体每个topic进行重新负载计算
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
MQClientInstance.startScheduledTask():启动一系列的定时任务。其中包括默认每30秒从namesrv获取最新的主题路由信息
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
AllocateMessageQueueAveragely.allocate(): 分配消费负载
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
4. 默认消费的负载均衡的机制分析和问题
分布式消息RocketMQ采取了一种比较松散的架构。 通过namesrv把多个独立运行broker信息汇总在一起。而客户端进行负载均衡的时候,也是独立计算和运行的。 服务端实际没有参与其中。
优点是充分利用了分布式的特征,可以非常线性的增加性能,可以支撑一个非常大规模的集群和客户端数量也没问题。
缺点也有如下几个:
a. 负载均衡的计算并不是同时进行,也没有统一进行分配,都是客户端自行独立计算。因而很容易出现的队列分配重叠情况。
b. 不能马上感知到集群的消费者实例或主题的变化,存在一定的延时和滞后。实际负载是经过一定时间最终达到负载均衡分配而已。
c. 消费者的消费进度是客户端定时上传更新的。对新加入的消费者实例而言,自行计算到分配的队列后,从broker上获取的消费进度,可能也有一定的延时。
d. 消费者实例比队列数多的情况,肯定存在消费者实例无法订阅消费任何消息的情况。为了缓解这个问题,可以一开始创建更多的队列数。 但队列数多了也会占用更多的资源,对性能会有一定的影响。
f. 存在单个队列消息量过多,消费无法负载分摊的问题。
以上的问题a,c都会导致消费重复的问题。 因而要正确的使用RocketMQ,需要确保业务侧实现消息的幂等性处理。
5. 展望未来
既然发现了这么多问题,那是否有一个从根本上解决上述问题的方法或思路呢。 答案在RocketMQ 5.0 引入的新消费接口上,把消费负载分配的逻辑都放到了broker上。有兴趣的可以自行查看RIP 19的描述。