1 消息可靠性
消息从发送,到消费者接收,会经理多个过程,其中的每一步都可能导致消息丢失.
#### 2 常见的丢失原因
- 发送时丢失:
生产者发送的消息未送达exchange
消息到达exchange后未到达queue - MQ宕机,queue将消息丢失
consumer接收到消息后未消费就宕机
2.2 RabbitMQ分别给出了解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
3 生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
- publisher-confirm-type 消息确认类型,
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
- publisher-returns 交换机发送到队列中,失败时触发
- template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
然后配置 RabbitTemplate
@Slf4j
@Configuration
public class RabbitCommonConfig {
@Bean
public RabbitTemplate fastjsonRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置强制回调
rabbitTemplate.setMandatory(true);
//
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
log.info("对应数据 {}",correlationData);
log.info("确认情况 {}",b);
log.info("原因 {}",s);
}
});
//发送失败时的回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("确认情况 {}",returnedMessage.toString());
//日志记录 重试
}
});
return rabbitTemplate;
}
}
完毕