说明:在MQ中,当一个队列中的消息出现以下情况时,就成为了死信(Dead Letter);
-
被消费者拒绝消费或者声明失败,并且requeue设置为false,即不再重新入队列;
-
队列中的消息存满,消息无法再入队列;
-
消息过期
此时,可以通过指定死信交换机,把这些消息路由到一个专门存放死信的队列中,以消息超时过期为例:
死信交换机&延迟队列
设置一个有超时的队列,该队列中的消息超过10秒未被消费成为死信,并把消息由指定的死信交换机路由到指定的队列;
/**
* 超时队列
*/
@Configuration
public class DelayConfig {
/**
* 创建延迟交换机
* @return
*/
@Bean
public DirectExchange delayExchange(){
return new DirectExchange("delay.direct",true,false);
}
/**
* 创建延迟队列,该队列的消息需要延迟10秒,超时未消费后路由到死信交换机(death.delay.direct)那里去
* @return
*/
@Bean
public Queue delayQueue(){
return QueueBuilder.durable("delay.queue")
.ttl(10000)
.deadLetterExchange("death.delay.direct")
.build();
}
/**
* 绑定
* @return
*/
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay");
}
}
接收死信交换机路由过来的消息
/**
* 接收死信交换机(death.delay.direct)路由过来队列(death.delay.queue)消息,路由关键字是delay
* @param deathLetterMessage
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name ="death.delay.queue", durable = "true"),
exchange = @Exchange(name = "death.delay.direct"),
key = "delay"
))
public void getDeathLetterMessage(String deathLetterMessage){
System.out.println("deathLetterMessage = " + deathLetterMessage);
}
发送一条消息
/**
* 发送消息到延迟队列中
*/
@Test
public void testDelayQueue(){
String message = "hello, delay queue";
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
}
延迟10秒后,被转发到死信交换机,路由到存放死信的队列并打印
另外,如果给队列设置了超时,同时也给消息设置了超时,执行效果是,队列超时和消息超时,哪个短以哪个为准;
/**
* 发送消息到延迟队列中
*/
@Test
public void testDelayQueue(){
// 设置消息的超时为5000ms,即5秒
Message message = MessageBuilder.withBody("hello , delay queue".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
}
延迟五秒,打印消息内容
插件使用
因为延迟队列的使用场景非常多,所以RabbitMQ的官方也推出了一个插件,实现延迟队列的效果。
第一步:上传到服务器
输入下面的命令,数据卷名改成自己创建mq容器时,挂载的数据卷名
docker volume inspect 数据卷名
找到容器挂载的数据卷路径
将下载后的插件复制到这里
第二步:安装插件
安装插件需要进入到MQ容器中,输入下面的命令
docker exec -it 容器名 bash
输入下面的命令,安装插件
rabbitmq-plugins enable 插件名
安装成功
第三步:使用
使用插件,不需要额外指定死信交换机,可直接在原队列、原消息上设置
在队列上设置,使用delayed属性
/**
* 使用插件实现延迟队列
* @param delayMessage
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name ="plugins.delay.queue", durable = "true"),
exchange = @Exchange(name = "plugins.delay.direct", delayed = "true"),
key = "plugins"
))
public void getDelayMessage(String delayMessage){
System.out.println("delayMessage = " + delayMessage);
}
如果使用@Bean注解方式创建,使用delayed()
@Bean
public DirectExchange delayedExchange(){
return ExchangeBuilder
.directExchange("plugins.delay.direct")
.delayed()
.durable(true)
.build();
}
消息设置,使用setHeader(“x-delay”, duration)方法
/**
* 发送消息到延迟队列中
*/
@Test
public void testPluginsDelayQueue(){
// 设置消息的超时为5000ms,即5秒
Message message = MessageBuilder.withBody("hello , plugins delay queue".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay", 5000)
.build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("plugins.delay.direct","plugins",message,correlationData);
}
延迟5秒,接收到消息
总结
延迟队列可通过死信交换机和插件的方式实现,可应用于订单未支付,超时失效、预约等场景