如果消息重复消费会影响您的业务处理,要对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。
概念
在消息领域,幂等是指Consumer重复消费某条消息时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。
例如,在支付场景下,Consumer消费扣款消息,对一笔订单执行扣款操作,扣款金额为500元。如果因网络不稳定等原因导致扣款消息重复投递,Consumer重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费500元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消息幂等。
适用场景
在互联网应用中,尤其在网络不稳定的情况下,分布式消息服务RabbitMQ的消息有可能会出现重复。如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:
- 发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。
- 投递时消息重复
消息消费的场景下,消息已投递到Consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,分布式消息服务RabbitMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。
- 负载均衡时消息重复(包括但不限于网络抖动、服务端重启以及Consumer应用重启)
当分布式消息服务RabbitMQ的服务端或客户端重启、扩容或缩容时,会触发Rebalance,此时Consumer可能会收到重复消息。
处理方法
以Message ID为幂等键对消息进行幂等处理的步骤如下:
(1)在数据库中创建一张unique key索引为唯一Message ID的表。
(2)在Producer客户端为每条消息设置唯一Message ID。
设置唯一Message ID的示例代码如下:
AMQP.BasicProperties props =newAMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("ExchangeName","RoutingKey",true, props,("消息发送"+ i).getBytes());
(3)在Consumer客户端根据唯一Message ID对消息进行幂等处理。
根据唯一Message ID进行幂等处理的示例代码如下:
channel.basicConsume(Producer.QueueName, false, "MyConsumerTag",
new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope env,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 1. 获取业务唯一性索引数据。
try{
String messageId = properties.getMessageId();
// Message ID或者其他作为unique key的信息。
// 2. 开启数据库事务。
idempTable.insert(messageId);
// 3. 对接收到的消息,进行业务逻辑处理。
// 4. 提交或回滚事务。// 处理成功,则进行ACK,否则不要进行ACK。
channel.basicAck(env.getDeliveryTag(), false);
}
catch (数据库主键冲突异常 e){
// 重复消息,直接确认掉。
channel.basicAck(env.getDeliveryTag(), false);
}
}
}
);