数据在RocketMQ中是分布式存储的。生产者发送消息时,先从NameServer获取到路由信息,然后根据一定算法将消息发送到某个Master-Broker中。但是,Topic是一个逻辑概念,对于某个Topic来说,属于它的消息分布在不同的Broker上,那如何决定消息到底分布在哪个Broker上呢?现在我们先了解一下MessageQueue、消息发送和消息存储。
一、MessageQueue
我们在创建Topic时,需要指定一个很关键的参数——MessageQueue,比如下图,我们可以在RocketMQ的可视化工作台里去创建一个名为order_topic
的Topic,指定包含4个MessageQueue:
那么Topic、MessageQueue、Broker之间到底是什么关系?
可以先简单的认为:Topic是消息的逻辑分类,消息保存在MessageQueue中,MessageQueue分布在Master-Broker上,Slave-Broker从Master-Broker同步数据。
事实上,MessageQueue本质就是一个数据分片的机制。比如order_topic一共有1万条消息,那么可以大致认为每个MessageQueue保存2500条消息。但是,这不是绝对的,需要根据Producer写入消息的策略来定,可能有的MessageQueue中消息多些,有的少些。我们先暂且认为消息是在MessageQueue上平均分配的,然后MessageQueue也可能平均分布在Master-Broker上,如下图:
二、消息发送
2.1 消息发送策略
我们之前说过,Producer会从NameServer拉取路由信息,那么Producer就知道了Broker集群的情况:每个Master-Broker分布着哪些Topic?每个Topic有多少MessageQueue?这些MessageQueue分布在哪些Master-Broker上?
这样的话,Producer就可以按照一定策略将消息写入到Master-Broker中,比如对于order_topic
,Producer现在知道了它有4个MessageQueue,均匀分布在两个Master-Broker上,那么Producer就可能采取Round Robin策略均匀的order_topic类型的消息写入到各个MessageQueue中。
2.2 消息发送容错
那如果某个Master-Broker故障了怎么办?RocketMQ虽然有Dledger机制可以实现故障自动转移,但是主从切换需要时间,在故障的这段时间里Producer访问那个故障的Master-Broker都会失败:
RocketMQ的Producer中有个开关,叫做sendLatencyFaultEnable
,一旦打开了这个开关,就会开启生产者的自动容错机制,比如Producer访问一个Master-Broker发现网络延迟有500ms,之后一直延迟,那么就会自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了。
这样就可以避免一个Master-Broker故障后,短时间内Producer频繁的发送消息到这个故障的Broker上去,出现大量写入异常。
三、消息存储
RocketMQ中最核心的一个环节就是Broker中的消息数据存储,也就是所谓的消息持久化。
3.1 CommitLog
生产者发送消息到Broker后,Master-Broker会将消息写入磁盘上的一个日志文件——CommitLog,按照顺序写入文件末尾,CommitLog中包含了各种各样不通类型的Topic对应的消息内容,如下图:
CommitLog文件每个限定最大1GB,Master-Broker收到消息之后就将内容直接追加到文件末尾,如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件。
Broker以顺序的方式将消息写入CommitLog磁盘文件,也就是每次写入就是在文件末尾追加一条数据就可以了,对文件进行顺序写的性能要比随机写的性能高很多。
3.2 ConsumeQueue
我们之前说过,消息是保存在MessageQueue中的,那这个CommitLog和MessageQueue是什么关系呢?事实上,对于每一个Topic,它在某个Broker所在的机器上都会有一些MessageQueue,每一个MessageQueue又会有很多ConsumeQueue文件,这些ConsumeQueue文件里存储的是一条消息对应在CommitLog文件中的offset偏移量。
举个例子,假设对于order_topic这个Topic,它在Broker集群中一共有4个MessageQueue:queue1、queue2、queue3、queue3,均匀分布在两个Master-Broker中,Producer选择queue1这个MessageQueue发送了一条“消息A”。那么:
- 首先Master-Broker接收到消息A后,将其内容顺序写入自己机器上的CommitLog文件末尾;
- 然后,这个Master-Broker会将消息A在CommitLog文件中的物理位置——offset,写入queue1对应的ConsumeQueue文件末尾;
整个过程如下图所示:
实际上,ConsumeQueue中存储的不仅仅只是消息在CommitLog中的offset偏移量,还包含了消息长度、tag hashcode等信息,一条数据是20个字节,每个ConsumeQueue文件能保存30万条数据,所以每个ConsumeQueue文件的大小约为5.72MB。
我们可以在Master-Broker所在机器的如下目录中找到ConsumeQueue文件:
- $HOME/store/consumequeue/{topic}/{queueId}/{fileName}
上述示例中,我们的Tpoic是order_topic
,queueId是queue1,comsumequeue的名称是comsumequeue1,所以可以在如下路径找到ConsumeQueue文件:
- $HOME/store/consumequeue/order_topic/queue1/comsumequeue1