使用场景
生产者在发布消息后,怎么确认消息已正确发布到服务器端?服务器端又怎么确认消息已成功被消费?RabbitMQ提供的消息确认机制可以解决此问题。
在使用RabbitMQ时,生产者确认和消费者确认对于确保数据可靠性至关重要。如果连接失败,传输中的消息可能会丢失,需要重新传输。确认机制可以让服务器和客户端知道何时重新传输消息。客户端可以在收到消息时确认消息,也可以在客户端完全处理完消息后确认。生产者确认会影响性能,如果需要很高的吞吐量,应禁用生产者确认。注意,不使用生产者确认会导致可靠性下降。
更多关于消息确认机制的说明,请参考Consumer Acknowledgements and Publisher Confirms。
生产者确认
生产者确认,即服务端在收到来自生产者的消息时进行确认。
以下示例演示在Java客户端配置生产者确认:
try {
channel.confirmSelect() ; //将信道置为publisher confirm模式
//之后正常发送消息
channel . basicPublish( "exchange " , " routingKey" , null , "publisher confirm test " .getBytes());
if (!channel.waitForConfirms()) {
System.out.println( "send message failed " ) ;
// do something else....
}
} catch (InterruptedException e) {
e.printStackTrace() ;
}
调用channel .waitForConfirms方法之后,会等待服务端确认,这是一种同步等待的方式,会对性能产生影响。如果生产者要满足at least once,就必须使用同步等待方式。
消费者确认
消费者确认是指服务端通过确认消息是否成功被消费者接收,来判断是否删除队列中的此消息。
消费者确认对数据可靠性十分重要,接收重要消息的消费应用程序在未处理完消息前不应确认消息,以便消费者有足够的时间处理消息,无需担心消息处理过程中由于消费者进程异常(如工作程序崩溃、重启等)导致消息丢失。
消费者确认在客户端上配置,通过配置basicConsume方法启用确认。在channel中启用消费者确认适用于大多数场景。
以下示例演示在Java客户端配置消费者确认(使用Channel#basicAck设置basic.ack为肯定):
// this example assumes an existing channel instanceboolean autoAck = false;
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});
未确认的消息缓存在内存中,如果未确认的消息过多,会导致内存使用率过高,此时可以在客户端配置预取值来限制消费者预取的消息数量,具体方法请参见预取值。