消息队列的优先级
消息队列规范中描述的优先级是指在一个消息队列中,每条消息都有不同的优先级,一般用整数来描述,优先级高的消息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。
对于优先级问题,可以归纳为2类:
- 只要达到优先级目的即可,不是严格意义上的优先级,通常将优先级划分为高、中、低,或者再多几个级别。每个优先级可以用不同的topic表示,发消息时,指定不同的topic来表示优先级,这种方式可以解决绝大部分的优先级问题,但是对业务的优先级精确性做了妥协。
- 严格的优先级,优先级用整数表示,例如0 ~ 65535,这种优先级问题一般使用不同topic解决就非常不合适。如果要让MQ解决此问题,会对MQ的性能造成非常大的影响。这里要确保一点,业务上是否确实需要这种严格的优先级,如果将优先级压缩成几个,对业务的影响有多大?
RocketMQ的优先级
由于RocketMQ所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此RocketMQ没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列, 将不同优先级发送到不同队列即可。
由于RocketMQ所有消息都是持久化的( 持久化到磁盘,每个partition只存index,导致后面严格排序代价很大,很大),所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特 意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级 的队列, 将不同优先级发送到不同队列即可。
RocketMQ中消息优先级
RocketMQ 并不遵循任何规范,但参考了各种规范的设计思想。考虑到持久化的消息按照优先级排序开销大,RocketMQ 没有特意支持消息优先级。查看了代码,在Message的API中,的确没有提供和Priority(优先级)有关的方法。
RocketMQ两种变通的方案
划分优先级
使用消息队列来表示不同的优先级:单独配置一个优先级高的队列,和一个普通优先级的队列, 将不同优先级发送到不同队列即可;
使用Topic来表示不同的优先级:每个优先级可以用不同的topic表示,发消息时,指定不同的 topic 来表示优先级。
有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法。
Topic级别进行划分优先级
多个不同的消息类型使用同一个topic时,由于某一种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的Topic,其他类型消息在另外一个Topic,应用程序创建两个Consumer,分别订阅不同的Topic,这样就可以了。
Queue级别进行划分优先级
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从100家快递门店过来的请求,把这些请求通过Producer写入RocketMQ;订单处理程序通过Consumer 从队列里读取消息并处理,每天最多处理1万单 。
如果这100个快递门店中某几个门店订单量大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他的99家门店可能被迫等待门店一的2万单处理完,也就是两天后订单才能被处理,显然很不公平 。
这时可以创建一个Topic, 设置Topic的MessageQueue数量超过100个,Producer根据订单的门店号,把每个门店的订单写入一个MessageQueue。
DefaultMQPushConsumer默认是采用循环的方式逐个读取一个Topic的所有 MessageQueue,这样如果某家门店订单量大增,这家门店对应的MessageQueue消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。
DefaultMQPushConsumer默认的pullBatchSize是32,也就是每次从某个MessageQueue读取消息的时候,最多可以读32个 。在上面的场景中,为了更加公平,可以把pullBatchSize设置成1。这样会导致效率和性能下降。
org.apache.rocketmq.client.producer.MessageQueueSelector
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
我们需要通过自己去实现优先级队列的选择器进行相关的数据控制信息。
以下是我写的一个相关的实现机制:
@Data
public class MessageSelectorArg{
private int priorityLevel;
private Object entity;
}
public class PriorityMessageSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
MessageSelectorArg msgArg = (MessageSelectorArg) arg;
// 其中 MQClientInstance是通过producer.getDefaultMQProducerImpl().getmQClientFactory();
MQClientInstance mqClientInstance = msgArg.getMqClientInstance();
int priorityLevel = msgArg.getPriorityLevel();
//选取最空闲的队列
MessageQueue queue = customSelect(mqs, priorityLevel, mqClientInstance);
if (null != queue) {
return queue;
}
//默认采用hash
return hashQueue(mqs, System.currentTimeMillis());
}
private MessageQueue customSelect(List<MessageQueue> mqs, int priorityLevel, MQClientInstance mqClientInstance) {
MessageQueue messageQueue = mqs.stream().filter(param->param.getQueueId().equals(priorityLevel)).findFirst().get();
if (null != messageQueue && mqs.contains(messageQueue)) {
return messageQueue;
}
}
}
强制划分优先级
TypeA、TypeB、TypeC三类消息 。 TypeA处于第一优先级,要确保只要有TypeA消息,必须优先处理; TypeB处于第二优先级; TypeC处于第三优先级 。
对这种要求,或者逻辑更复杂的要求,就要用户自己编码实现优先级控制,如果上述的三类消息在一个 Topic 里,可以使用PullConsumer,自主控制MessageQueue的遍历,以及消息的读取;如果上述三类消息在三个Topic下,需要启动三个Consumer, 实现逻辑控制三个Consumer的消费 。
原理通我们上面类似,大家可以自己联想分析一下哈。
最后的疑问!
由于是并发消费,例thread-1线程在消费msg1,thread-2在消费消息msg2,thread3消费消息msg3,此时如何thread3先消费完msg3,但thread1,thread2还未处理完msg1、msg2,那thread-1是向Broker反馈msg3的偏移量?