一. 整体架构
系统架构
topic:
一个topic对应多个队列,队列均匀的分散到broker上;
队列:
一个队列只能被一个消费者消费;
nameServer:
保存topic中队列与broker的映射关系,集群模式,节点之间毫无关系,会与所有的broker进行连接。
broker:
分布式架构,主从模式,主从保障单broker的高可用;写master,读可slaver。broker所有节点建立长连接。一个broker上的所有的topic共享一个commitlog文件,物理角度,再由topic下多个队列保存消息索引以及消费者消费的offset。
consumer:
与任一nameServer建立长连接,定期获取topic对应的队列列表信息(包含brokerName)和broker映射列表信息(brokerName以及主从地址),与broker的master或者slaver建立长连接,定期向broker发送心跳;从broker获取该topic下consumerGroup的机器列表(连接topic下任一broker的主或从,broker的信息是通过相互通信获取还是其他途径暂时没有做深入了解),有状态,任一消费者挂了,会重新负载,与broker重新建立连接。
消费消息时,会通过当前负载到的messageQueue查询brokerAddr(可能是master可能是slaver),建立连接获取消息(已经建立的连接,只有在broker不可用时断掉)。
producer:
与任一nameServer建立长连接,定期获取topic信息,与broker的master建立长连接,定时向master发布心跳,无状态。
存储架构
物理:
逻辑:
二.消费端架构与调度链路
如下为调度消费架构图:
调用链路:
- metaq在consumer启动的流程中,会初始化rebalance,拿到本机需要消费的队列信息(com.alibaba.rocketmq.client.impl.consumer.RebalancePushImpl#dispatchPullRequest);
- 随后会将队列(pullRequest)加入到一个阻塞队列中,一个唯一线程数的线程池从阻塞队列读取队列请求进行执行(com.alibaba.rocketmq.client.impl.consumer.PullMessageService)。
- 核心执行方法如下:com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
- com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
- 从队列中拉取到数据之后,会加入到processQueue,再加入执行线程池。这里加入到processQueue的目标之一是为了做缓存校验(缓存数据过多的情况下,先暂停拉取数据),因为后续加入的执行线程池是阻塞队列,其容量无限大。
processQueue的作用:
客户端拉取队列的数据都会在processQueue中进行维护(可以理解成broker在客户端的一个映射),消费完成之后移除,拉取之后都会先判断当前队列中数据大小(数量阈值1000,内容阈值100*1024*1024)是否超过设定值,超过则过一会再拉取,并且在消费者初始化时,还会起一个定时任务ConsumeMessageConcurrentlyService#start,每15分钟执行一次清理,将拉取到的15分钟还未消费的数据给清理掉。
这里有个问题:每15分钟清理一次,只清理了processQueue中的数据,线程池阻塞队列数据并没有清理,当15分钟过后再次拉取数据放入阻塞队列,阻塞队列会越不越大,从而导致内存溢出,这里已经确认是个bug,因此在消费rt过慢的情况下,需要考虑消费限速。
三.负载均衡
负载均衡时机
消费端负载均衡
在启动时,会拉取从nameServer拉取所有的breker和消费者信息,进行负载均衡计算;之后队列和消费者变更应该是nameServer主动通知所有消费者更新负载均衡;
注意:广播模式下,消费端不会进行负载均衡,会进行全部消费。
发送端负载均衡
发送端负载均衡是通过轮询所有的队列来均衡的进行消费发送。
DefaultMQProducerImpl#sendDefaultImpl -> MQFaultStrategy#selectOneMessageQueue
负载均衡策略(AllocateMessageQueueStrategy)
注意:可以通过diamond配置来选择策略。
默认:com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
- AllocateMessageQueueAveragely 平均分配
即:按顺对每个消费者进行分配。举个例子:7个队列(abcdefg),5个消费者,则分配2(ab)、2(cd)、1(e)、1(f)、1(g);
部分源码如下:
Collections.sort(mqAll);
Collections.sort(cidAll);
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
- AllocateMessageQueueAveragelyByCircle 循环平均分配
即:轮询对每个消费者进行分配。举个例子:7个队列(abcdefg),5个消费者,则分配2(af)、2(bg)、1(c)、1(d)、1(e);
部分源码如下:
Collections.sort(mqAll);
Collections.sort(cidAll);
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
- AllocateMessageQueueByConfig 配置分配
略
- AllocateMessageQueueByMachineRoom 机房分配
略
- AllocateMessageQueueConsistentHash 一致性hash分配
略
四. 高可用/可靠性/一致性实现
高可用如何实现?
broker:
主从模式,任一master挂了,nameServer感知到了,会将该master从路由列表中移除,不会马上通知消费者和生产者。
生产者发送消息选择messageQueue时,会判断该queue的brokerName是否是可用的,不可用,则选择其他的。
消费者在拉取消息时,会实时查询当前messageQueue对应所在的brokerAddr。本地缓存Map<brokerName,Map<brokerId,address>>。如果pull失败,后续会重试,且consumer会定时更新本地的路由信息。
nameServer:
集群模式,任一节点和所有broker连接,互相直接不通信,无状态,任一down掉不影响。
producer:
集群模式,任一节点从nameServer获取队列发送,无状态,任一down掉不影响。
consumer:
有状态,任一consumer 泵机,停止像broker发送心跳后,broker会将该consumer从维护的topic对应的group中移除,消费者会重新负载均衡(这里触发机制暂时还没有作深入了解)。
一致性如何实现?
由于消息发送是到master,消费是可master和slaver,master和slaver的异步同步可能存在延迟,所以是保障最终一致性。
可靠性如何?
master断电泵机,可能会导致少量数据丢失,因为是异步刷盘(刷盘策略可配置),不是完全存入磁盘;
如果是磁盘损坏,该机器上所有数据丢失,但是异步同步的模式下,百分之99不会丢失,,需要强制不丢失的情况下,可以采用双写模式,即写入需要master和slaver都成功的场景下才返回成功,但是会影响性能。
集群消费失败可以重试,但是广播模式下不可重试。
五. 消费核心逻辑
- 设置消费线程数;
- 设置拉取间隔(pullInterval)和批次拉取数量(pullBatchSize)
- 自定义实现;
六. 消费吞吐
通过消费者端维护processQueue(本地映射),批拉取数据,提高吞吐。
七. 消息的顺序性?
broker端所有的队列存在一个commitlog里面,顺序写入。随机读取
单队列消费和写入都是顺序操作的。如果要保证顺序性,需要保障只有一个队列。即可做到先进先出。
八. 消费端核心链路——源码跟踪
- 启动逻辑
RebalanceService.run->mqClientInstance.doRebalance->DefaultMQPushConsumerImpl.doRebalance->rebalanceImpl.doRebalance->rebalanceImpl.rebalanceByTopic->rebalanceImpl.updateProcessQueueTableInRebalance[新增topic拉取消息的触发点,对于已有topic如果没有发生MessageQueue变化不会触发拉取操作]->rebalanceImpl.computePullFromWhere->rebalanceImpl.dispatchPullRequest->defaultMQPushConsumerImpl.executePullRequestImmediately->PullMessageService.executePullRequestImmediately。
- 消费逻辑:
pullRequestQueue.take()->PullMessageService.pullMessage->DefaultMQPushConsumerImpl.pullMessage->PullCallback.onSuccess->ConsumeMessageConcurrentlyService.submitConsumeRequest->consumeExecutor.submit->ConsumeRequest.run->listener.consumeMessage[业务代码]->ConsumeMessageConcurrentlyService.processConsumeResult
- 位点维护逻辑
1. 启动加载
RebalanceService.run->mqClientInstance.doRebalance->DefaultMQPushConsumerImpl.doRebalance->rebalanceImpl.doRebalance->rebalanceImpl.rebalanceByTopic->rebalanceImpl.updateProcessQueueTableInRebalance[新增topic拉取消息的触发点,对于已有topic如果没有发生MessageQueue变化不会触发拉取操作]->rebalanceImpl.computePullFromWhere->offsetStore.readOffset
2. 定时更新
MQClientInstance.persistAllConsumerOffset->offsetStore.persistAll->RemoteBrokerOffsetStore.updateConsumeOffsetToBroker