1. 原理说明
RocketMQ 的订阅消费的消息有一个确认的机制。 如果消息有限时间内(默认是15分钟)没有签收成功,或者业务层主动去决定晚点再消费,消息会发到重试队列中。 一定的时间间隔后,会被重新消费到。 如果多次(默认是16次)都无法成功消费的消息后,消息会被发到死信队列中。死信队列的消息不会再去重新消费, 需要用户额外去处理才行。
2. 订阅组配置
订阅组的核心配置如下:
public class SubscriptionGroupConfig {
private int retryQueueNums = 1;
private int retryMaxTimes = 16;
}
核心属性是最大重试次数。 其次是对应重试队列的队列数
3. 消息流程图:
订阅组有一个对应的重试队列主题。如果订阅失败了,会先把消息写到延时主题上,再投递到到重试队列里面。
订阅组会定时去查一下重试队列的主题,看看是否有消息可以重试。
如果重试失败的次数大于retryMaxTimes或大于客户端自定义的次数时,消息会直接发到死信队列上,后续则无法直接消费的。
注意: 最佳实践强调,消费组实例要确保订阅关系一致。主要也是因为重试主题的这个机制。如果订阅关系不一致,则重试主题里面消息可能保存了各种订阅关系的主题。消费者实例是随机去重试消费这些的消息的,如果订阅关系不一致,是可能导致消费到没有订阅的消息的。
4. 名字规范
重试主题的名字规范: %RETRY%{订阅组名字} 。
死信队列主题的名字规范: %DLQ%{订阅组名字}
如果订阅组的名字是sub , 则重试主题的名字是: %RETRY%sub, 死信队列的主题名是:%DLQ%sub.
5. 测试的代码例子
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr(nameServer);
consumer.setConsumerGroup("test_group_1");
//客户端设置最大的重试次数。没配则使用服务端的参数
consumer.setMaxReconsumeTimes(1);
//订阅某个主题
consumer.subscribe(TOPIC_NAME, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//业务逻辑,
//消息待重试 , 成功则返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
});
consumer.start();
6. 死信队列的消息处理
对应死信队列的消息,一般来说,可以采取以下两种的处理方式:
1. 通过PULL消费的方法,可以查询到这些这些消息,后续怎么处理,需要结合具体的业务逻辑
2. 如果业务系统已经完成了幂等性等处理,可以重发这些死信消息。