前提介绍
在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。
DefaultMQPullConsumer
DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。
总体流程执行
DefaultMQPullConsumer使用例子
public class MQPullConsumer { private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName"); consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.start(); // 从指定topic中拉取所有消息队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic"); for(MessageQueue mq:mqs){ try { // 获取消息的offset,指定从store中获取 long offset = consumer.fetchConsumeOffset(mq,true); while(true){ PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq,pullResult.getNextBeginOffset()); switch(pullResult.getPullStatus()){ case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.println(new String(m.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; } } } catch (Exception e) { e.printStackTrace(); } } consumer.shutdown(); } // 保存上次消费的消息下标 private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) { OFFSE_TABLE.put(mq, nextBeginOffset); } // 获取上次消费的消息的下标 private static Long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if(offset != null){ return offset; } return 0l; } }
消费者启动:consumer.start();
获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest"); //遍历队列 for(MessageQueue mq:mqs){ try { //获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取,true表示从broker获取 long offset = consumer.fetchConsumeOffset(mq,true); while(true){ //第二个参数表示可以消费哪些tag的消息 //第三个参数表示从哪个位移开始消费消息 //第四个参数表示一次最大拉多少个消息 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); }
DefaultMQPullConsumer的总体流程
启动DefaultMQPullConsumer是通过调用start()方法完成的
DefaultMQPullConsumer拉取源码分析
分析下DefaultMQPullConsumer拉取消息的流程
consumer.fetchSubscribeMessageQueues("order-topic")
从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
核心源码分析
fetchSubscribeMessageQueues()
通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。
每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。
向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { try { TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); if (topicRouteData != null) { // 2、遍历topicRouteData Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData); if (!mqList.isEmpty()) { return mqList; } else { throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null); } } } catch (Exception e) { throw new MQClientException( "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), e); } throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); }
遍历过程TopicRouteData
遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限, 若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合; 最后返回给MessageQueue集合
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { Set<MessageQueue> mqList = new HashSet<MessageQueue>(); List<QueueData> qds = route.getQueueDatas(); for (QueueData qd : qds) { if (PermName.isReadable(qd.getPerm())) { for (int i = 0; i < qd.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); mqList.add(mq); } } } return mqList; }
consumer.fetchConsumeOffset
通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。
DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。 这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。
pullBlockIfNotFound拉取信息
rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。
不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); }
获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。
参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;
参数2:需要过滤用的表达式 ;
参数3:偏移量即消费队列的进度 ;
参数4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
DefaultMQPullConsumerImpl.pullSyncImpl的实现过程
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.isRunning(); //检查入参是否合法 if (null == mq) { throw new MQClientException("mq is null", null); } if (offset < 0) { throw new MQClientException("offset < 0", null); } if (maxNums <= 0) { throw new MQClientException("maxNums <= 0", null); } //更新再平衡服务的数据,因为再平衡服务不起作用,所以更新数据没有效果 this.subscriptionAutomatically(mq.getTopic()); int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); //计算超时时间,如果调用的是pullBlockIfNotFound方法,block参数就是true,否则就是false long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); //调用PullAPIWrapper从broker拉取消息, //pullKernelImpl方法里面构建PullMessageRequest请求对象 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq,//队列 subscriptionData.getSubString(),//消息的过滤规则 subscriptionData.getExpressionType(), isTagType ? 0L : subscriptionData.getSubVersion(), offset,//拉取消息的位移 maxNums,//建议broker一次性返回最大消息个数,默认是32个 sysFlag, 0,//设置的提交位移,可以看到永远都是0,所以broker无法记录有效位移,需要程序自己记录控制提交位移 this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis,//超时时间 CommunicationMode.SYNC, null//回调逻辑为null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); //If namespace is not null , reset Topic without namespace. this.resetTopic(pullResult.getMsgFoundList()); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult; }
检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*" this.subscriptionAutomatically(mq.getTopic()); // 构建标志位,逻辑或运算|= int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData; try { //以请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回 subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; // 从broker中拉取消息 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), 0L, offset, maxNums, sysFlag, 0, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null ); // 对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中 this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult; }
Push和Pull的操作对比
push-优点:及时性、服务端统一处理实现方便
push-缺点:容易造成堆积、负载性能不可控
pull-优点:获得消息状态方便、负载均衡性能可控
pull-缺点:及时性差
使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。