消息的签收机制说明
消息消费成功后,我们在客户端签收后,消息就从MQ服务器里面删除了若消息没有消费成功,我们让他回到MQ里面,让别人再次重试消费。
自动签收
消息只要被客户端接收到,无论你客户端发生了什么,我们服务器都不管你了,直接把消息删除了,这是它是默认的行为。
手动签收
创建项目 springboot-rabbitmq,创建方式和之前的方式一样依赖也是。
修改application.yml配置文件:
server
port8080
spring
application
name Springboot-RabbitMQ
rabbitmq
username user
password123456
host139.196.183.130
port5672
virtual-host v-it6666
# NONE 值是禁用发布确认模式,是默认值
# CORRELATED 值是发布消息成功到交换机后会触发回调方法
publisher-confirm-type correlated
# 这个是老版本的用法
# publisher-confirms: true
# 消息由交换机到达队列失败时触发
publisher-returnstrue
listener
simple
# 自动签收,这个是默认行为
# acknowledge-mode: auto
# 手动签收
acknowledge-mode manual
direct
# 设置直连交换机的签收类型
acknowledge-mode manual
消息投递的 ID 说明
获取投递的 ID
/**
* @author BNTang
*/
public class MessageReceive {
(bindings = {
(
value = ,
key = {"error"},
exchange = (value = "directs", type = ExchangeTypes.DIRECT)
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
// 消息投递ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// messageId 就是消息的唯一的标识,自己定义
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消费者收到消息 → 消息对象:" + message);
System.out.println("消费者收到消息 → 内容为:" + content);
System.out.println("消费者收到消息 → 信道:" + channel);
System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);
channel.basicAck(deliveryTag, false);
}
}
basicAck方法参数的解释如下:
- deliveryTag:消息投递ID,要签收的投递ID。
- multiple:是否批量签收。
投递 ID 存在的问题及消息永久 ID 设置的问题
什么能代表消息的唯一的标识,显然投送的 ID 不行,因为一个消息可能会有多个投送的 ID,我们就需要给消息一个唯一的值,这个伴随消息终身,不会变化!我们需要发送消息时,给消息设置一个 ID,然后保证该 ID 唯一就可以了,如下所示!
void sendMsg() throws IOException {
for (int i = 0; i < 5; i++) {
this.rabbitTemplate.convertAndSend("directs", "error", "我是一个测试消息" + i,
message -> {
String messageId = UUID.randomUUID().toString().replace("-", "");
// 自己给消息设置自定义的ID
message.getMessageProperties().setMessageId(messageId);
return message;
});
System.out.println("消息发送成功");
System.in.read();
}
}
关于批量签收消息
若我们此时签收了编号为4的消息,但是前面的0,1,2,3 都没有签收,则MQ若是批量的签收,它会把0,1,2,3 都签收,因为MQ认为,比他晚投递的已经签收,前面的肯定已经消费成功了。
生产者
static int a = 1;
public void sendMessage() throws IOException {
for (int i = 0; i <= 3; i++) {
this.rabbitTemplate.convertAndSend("directs", "error", "ABC - " + i, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
// 自己给消息设置自定义的ID
message.getMessageProperties().setMessageId((a++) + "");
return message;
}
});
}
System.out.println("消息发送成功");
System.in.read();
}
消费者
/**
* @author BNTang
*/
public class MessageReceive {
(bindings = {
(
value = ("queue"),
key = {"error"},
exchange = (value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);
if (content.equals("ABC - 3")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息签收成功 → 内容为:" + content);
}
}
}
可以发现只签收了ABC - 3 但是队列里面没有消息了,说明前面的12都被批量签收了。
不签收
当我们认为消息不合格时,或不是我们要的消息时,我们可以选择不签收它。
生产者
public void sendMessage() throws IOException {
this.rabbitTemplate.convertAndSend("directs", "error", "1234567", new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
String messageId = UUID.randomUUID().toString().replace("-", "");
// 自己给消息设置自定义的ID
message.getMessageProperties().setMessageId(messageId);
return message;
}
});
System.out.println("消息发送成功");
System.in.read();
}
消费者
/**
* @author BNTang
*/
public class MessageReceive {
(bindings = {
(
value = ("queue"),
key = {"error"},
exchange = (value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);
if (content.equals("1234567")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息签收成功");
} else {
// 如果不是 1234567 就决绝签收
channel.basicNack(deliveryTag, false, true);
System.out.println("消息被决绝签收");
}
}
}
如上的代码测试方式你先发送一个消息,消息内容为 1234567 这是正常的情况,然后在发送一个 123456 就会发现效果,消息消费死循环了。
我们选择不签收,其实是为了保护消息,当消费消息发生异常时,我们可以把消息放在队列里面,让它重新投递,重新让别人消费!而不是丢了它!
解决不签收消息的死循环
不签收,并且让它回到队列里面,想法很好,但是很容易造成死循环,因为没有任何人能消费她! 我们设计一个机制,当一个消息被消费3次还没有消费成功,我们就直接把它记录下来,人工处理! 消息消费3次(消息的标识,消息的计数)我们引入Redis,使用Redis计数,若超过3次,直接拒绝消息,并且不回到队列里面。
引入 Redis 依赖,并使用 Docker 运行 Redis,Redis 依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Docker 运行 Redis 命令脚本如下所示,当然也可以使用本地的Redis图方便,我这里是有机子我就用我的机子了:
docker run -d --name myredis -p 6390:6379 redis --requirepass "1234"
修改消费者的配置文件
server
port8002
spring
application
name consumer
rabbitmq
host139.196.183.130
port5672
username user
password123456
virtual-host v-it6666
# Redis的配置
redis
host139.196.183.130
port6390
password1234
改造消费者,改造之后的代码如下:
/**
* @author BNTang
*/
public class MessageReceive {
private StringRedisTemplate redisTemplate;
/**
* 消息的前缀
*/
private String MESSAGE = "message:";
(bindings = {
(
value = ("queue"),
key = {"error"},
exchange = (value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息投递ID → :" + deliveryTag);
System.out.println("消息自定义ID → :" + messageId);
if (content.equals("1234567")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息签收成功");
} else {
String count = this.redisTemplate.opsForValue().get(MESSAGE + messageId);
if (count != null && Long.valueOf(count) >= 3) {
channel.basicNack(deliveryTag, false, false);
System.out.println("该消息消费【3】次都失败,我们记录它,人工处理" + content);
} else {
// 如果不是 1234567 就决绝签收
// 处理业务逻辑【可能逻辑处理的出现了问题啥的】
channel.basicNack(deliveryTag, false, true);
System.out.println("消息被决绝签收");
// 因为拒绝了,我们把消息ID放到Redis里面
this.redisTemplate.opsForValue().increment(MESSAGE + messageId);
}
}
}
}
如上basicNack方法参数的解释如下所示:
- deliveryTag:消息的投递ID,要签收的投递ID是多少
- multiple:是否批量签收
- requeue:true,代表决绝签收,并把消息重新放回队列里面,false,直接拒绝签收
测试注意,因为统计计数时,消息的次数,是通过消息的 ID 来计数的,我们在发送消息时,要设置消息的头: