1 概述
RocketMQ服务端通过CommitLog存储消息数据,并使用ConsumeQueue来保存每条消息在CommitLog中的物理偏移量。然而通过对ConsumeQueue和CommitLog整体遍历寻找消息的方式无疑非常的低效。因此,RocketMQ设计IndexFile支持通过唯一Key和时间范围来快速查询检索消息。
2 IndexFile设计原理
2.1 IndexFile文件结构
图1 IndexFile文件结构
- Index Header结构各字段说明:
字段 |
说明 |
beginTimestamp |
第一个索引消息落在Broker的时间戳 |
endTimestamp |
最后一个索引消息落在Broker的时间戳 |
beginPhyOffset |
第一个索引消息在commitlog的偏移量 |
endPhyOffset |
最后一个索引消息在commitlog的偏移量 |
hashSlotCount |
构建索引占用的槽位数 |
indexCount |
构建的索引个数 |
- SlotTable结构
SlotTable里面的每一项保存的是这个topic-key是第几个索引。根据topic-key的Hash值除以500W取余得到这个Slot Table的序号,然后将此索引的顺序个数存入SlotTable中。
SlotTable的绝对位置absSlotPos的计算公式如下:40+keyHash%(500W)*4
- Index Linked List的字段说明:
字段 |
说明 |
keyHash |
topic-key(key是消息的key)的Hash值 |
phyOffset |
commitLog真实的物理位移 |
timeOffset |
时间位移,消息的存储时间与Index Header中beginTimestamp的时间差 |
slotValue |
当topic-key(key是消息的key)的Hash值取500W的余之后得到的Slot Table的slot位置中已经有值了(即Hash值取余后在Slot Table中有冲突时),则会用最新的Index值覆盖,并且将上一个值写入最新Index的slotValue中,从而形成了一个链表的结构。 |
Index Linked List的位置absIndexPos的计算公式: 40+ 500W*4+index的顺序数*40
2.2 构建Index索引
服务端在处理消息生成索引时,调用putKey方法创建索引,该方法中的入参key为topic-key值;phyOffset为物理偏移量。
1.首先根据key的Hash值计算出absSlotPos值;
2.根据absSlotPos值作为index文件的读取开始偏移量读取4个字节的值,即为了避免KEY值的hash冲突,将之前的key值的索引顺序数给冲突了,故先从slot Table中的取当前存储的索引顺序数,若该值小于零或者大于当前的索引总数(IndexHeader的indexCount值)则视为无效,即置为0;否则取出该位置的值,放入当前写入索引消息的Index Linked的slotValue字段中;
3.计算当前存时间距离第一个索引消息落在Broker的时间戳beginTimestamp的差值,放入当前写入索引消息的Index Linked的timeOffset字段中;
4.计算absIndexPos值,然后根据数据结构上值写入Index Linked中;
5.将索引总数写入slot Table的absSlotPos位置;
6.若为第一个索引,则更新IndexHeader的beginTimestamp和beginPhyOffset字段;
7.更新IndexHeader的endTimestamp和endPhyOffset字段;
8.将IndexHeader的hashSlotCount和indexCount字段值加1。
2.3 通过Topic-Key查询某个时间范围内消息列表
服务端首先要检查开始时间、结束时间是否落在该Index文件中,比较IndexHeader的beginTimestamp和EndTimestamp,如果开始时间begin和结束时间end有一部分落在了Index内,则返回对应结果。
服务端Index的主要查找逻辑是由selectPhyOffset方法实现。参数key值是topic-key(消息的key)的值。具体的查找物理偏离量列表逻辑和步骤如下:
1. 计算key值的hashcode值;然后除以500W取余,得slotPos值;
2. 计算absSlotPos=40+slotPos*4;然后从index中以absSlotPos偏移量读取4个字节的整数值,即为该索引的顺序数index;
3. 计算absIndexPos=40+ 500W*4+index的顺序数*20;
4. 以absIndexPos为开始偏移量从index中读取后面20个字节的消息单元数据。
5. 检查读取到的数据中keyHash值是否等于请求参数key值的hash值,存储时间是否在请求时间范围内,若是在存入物理偏移量列表中;
6. 然后用读取数据中的slotValue值重新计算absIndexPos;并重新第4/5/6步的操作。这就是说在此次该key值时,Hash值有冲突,在Index Linked List中形成了链表,该链表是由slotValue值连接各个消息单元的。
2.4 IndexService服务
IndexService是索引的主处理线程,在启动Broker时启动该线程服务。该服务主要有两个功能:
1)定时的创建消息的索引;
2)为应用提供访问index索引文件的接口。
图2 创建索引主流程
- 创建消息的索引过程分析
在将消息写入commitlog中之后,会创建DispatchRequest请求,并将请求放入IndexService.requestQueue队列中,由IndexService线程每隔3秒检测该队列中的请求信息。若存在请求信息,则执行构建Index。具体逻辑如下:
首先,获取Index文件的对象IndexFile。
1)从IndexFile列表中获取最后一个IndexFile对象;若该对象对应的Index文件没有写满,即IndexHeader的indexCount不大于2000W;则直接返回该对象;
2)若获得的该对象为空或者已经写满,则创建新的IndexFile对象,即新的Index文件,若是因为写满了而创建,则在创建新Index文件时将该写满的Index文件的endPhyOffset和endTimestamp值初始化给新Index文件中IndexHeader的beginPhyOffset和beginTimestamp。
3)启一个线程,调用IndexFile对象的fush将上一个写满的Index文件持久化到磁盘物理文件中;然后更新StoreCheckpoint.IndexMsgTimestamp为该写满的Index文件中IndexHeader的endTimestamp;
其次,遍历requestQueue队列中的请求消息。将每个请求消息的commitlogOffset值与获取的IndexFile文件的endPhyOffset进行比较,若小于endPhyOffset值,则直接忽略该条请求信息;对于消息类型为Prepared和RollBack的也直接忽略掉。
最后,对于一个topic可以有多个key值,每个key值以空格分隔,遍历每个key值,将topic-key值作为putKey方法的入参key值,将该topic的物理偏移量存入Index文件中,若存入失败则再次获取IndexFile对象重复调用putKey方法。
- 查找topic-key的物理偏移量offset
执行queryOffset方法进行查询,每个topic和key值最多获取的物理偏移量不得超过64个。该方法查找的物理偏移量列表被封装在QueryOffsetResult对象中。在该方法中,从IndexFile列表的最后一个对象开始往前遍历每个IndexFile对象。主要步骤如下:
1)调用InfexFile对象的isTimeMacted检查开始时间、结束时间是否有一部分落在该Index文件中IndexHeader的beginTimestamp和EndTimestamp之间,若是则调用IndexFile的selectPhyOffset方法获取物理偏移量列表;
2)检查IndexFile的beginTimestamp是否小于入参begin,若是则不用在往前寻找了,直接返回;
3)若已经找到的物理偏移量个数已经大于了64,则直接返回结果。
3 总结
分布式消息RocketMQ通过IndexFile这种巧妙的存储结构设计,大大提升了消息按Key和时间范围查询检索效率。