searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

RocketMQ的死信队列功能

2023-09-18 01:34:47
10
0

1.  原理说明

RocketMQ 的订阅消费的消息有一个确认的机制。 如果消息有限时间内(默认是15分钟)没有签收成功,或者业务层主动去决定晚点再消费,消息会发到重试队列中。  一定的时间间隔后,会被重新消费到。 如果多次(默认是16次)都无法成功消费的消息后,消息会被发到死信队列中。死信队列的消息不会再去重新消费, 需要用户额外去处理才行。

 

2. 订阅组配置

订阅组的核心配置如下:

public class SubscriptionGroupConfig {
    private int retryQueueNums = 1;
    private int retryMaxTimes = 16;
}

 

 核心属性是最大重试次数。 其次是对应重试队列的队列数

 

3. 消息流程图:

 

订阅组有一个对应的重试队列主题。如果订阅失败了,会先把消息写到延时主题上,再投递到到重试队列里面。

订阅组会定时去查一下重试队列的主题,看看是否有消息可以重试。

 如果重试失败的次数大于retryMaxTimes或大于客户端自定义的次数时,消息会直接发到死信队列上,后续则无法直接消费的。

 

注意: 最佳实践强调,消费组实例要确保订阅关系一致。主要也是因为重试主题的这个机制。如果订阅关系不一致,则重试主题里面消息可能保存了各种订阅关系的主题。消费者实例是随机去重试消费这些的消息的,如果订阅关系不一致,是可能导致消费到没有订阅的消息的。

 

4. 名字规范

重试主题的名字规范: %RETRY%{订阅组名字}  。

死信队列主题的名字规范: %DLQ%{订阅组名字}

如果订阅组的名字是sub , 则重试主题的名字是: %RETRY%sub, 死信队列的主题名是:%DLQ%sub.

 

5. 测试的代码例子

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr(nameServer);
consumer.setConsumerGroup("test_group_1");
//客户端设置最大的重试次数。没配则使用服务端的参数
consumer.setMaxReconsumeTimes(1);
//订阅某个主题
consumer.subscribe(TOPIC_NAME, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//业务逻辑,
//消息待重试 , 成功则返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
});
consumer.start();

 

6. 死信队列的消息处理

对应死信队列的消息,一般来说,可以采取以下两种的处理方式:

1.   通过PULL消费的方法,可以查询到这些这些消息,后续怎么处理,需要结合具体的业务逻辑

2.  如果业务系统已经完成了幂等性等处理,可以重发这些死信消息。

 

 

 

0条评论
0 / 1000
叶****伟
6文章数
0粉丝数
叶****伟
6 文章 | 0 粉丝
叶****伟
6文章数
0粉丝数
叶****伟
6 文章 | 0 粉丝
原创

RocketMQ的死信队列功能

2023-09-18 01:34:47
10
0

1.  原理说明

RocketMQ 的订阅消费的消息有一个确认的机制。 如果消息有限时间内(默认是15分钟)没有签收成功,或者业务层主动去决定晚点再消费,消息会发到重试队列中。  一定的时间间隔后,会被重新消费到。 如果多次(默认是16次)都无法成功消费的消息后,消息会被发到死信队列中。死信队列的消息不会再去重新消费, 需要用户额外去处理才行。

 

2. 订阅组配置

订阅组的核心配置如下:

public class SubscriptionGroupConfig {
    private int retryQueueNums = 1;
    private int retryMaxTimes = 16;
}

 

 核心属性是最大重试次数。 其次是对应重试队列的队列数

 

3. 消息流程图:

 

订阅组有一个对应的重试队列主题。如果订阅失败了,会先把消息写到延时主题上,再投递到到重试队列里面。

订阅组会定时去查一下重试队列的主题,看看是否有消息可以重试。

 如果重试失败的次数大于retryMaxTimes或大于客户端自定义的次数时,消息会直接发到死信队列上,后续则无法直接消费的。

 

注意: 最佳实践强调,消费组实例要确保订阅关系一致。主要也是因为重试主题的这个机制。如果订阅关系不一致,则重试主题里面消息可能保存了各种订阅关系的主题。消费者实例是随机去重试消费这些的消息的,如果订阅关系不一致,是可能导致消费到没有订阅的消息的。

 

4. 名字规范

重试主题的名字规范: %RETRY%{订阅组名字}  。

死信队列主题的名字规范: %DLQ%{订阅组名字}

如果订阅组的名字是sub , 则重试主题的名字是: %RETRY%sub, 死信队列的主题名是:%DLQ%sub.

 

5. 测试的代码例子

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr(nameServer);
consumer.setConsumerGroup("test_group_1");
//客户端设置最大的重试次数。没配则使用服务端的参数
consumer.setMaxReconsumeTimes(1);
//订阅某个主题
consumer.subscribe(TOPIC_NAME, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//业务逻辑,
//消息待重试 , 成功则返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
});
consumer.start();

 

6. 死信队列的消息处理

对应死信队列的消息,一般来说,可以采取以下两种的处理方式:

1.   通过PULL消费的方法,可以查询到这些这些消息,后续怎么处理,需要结合具体的业务逻辑

2.  如果业务系统已经完成了幂等性等处理,可以重发这些死信消息。

 

 

 

文章来自个人专栏
中间件随笔
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0