客户端连接RabbitMQ集群实例时,如果存在消息收发时间间隔大于90秒的场景,请在客户端开启心跳并设置小于90秒的心跳超时时间,防止断连。
什么是心跳
RabbitMQ实例提供了心跳功能,以确保应用程序层及时发现中断的连接和完全无响应的对端。心跳还可以防止某些网络设备在一段时间内由于没有活动而中断TCP连接。开启心跳的方法为在连接上指定心跳超时时间。
心跳超时时间定义了对等TCP连接在多长时间后被服务端和客户端视为关闭。服务端和客户端会对配置的心跳超时时间进行协商,客户端必须配置该值来发送心跳。RabbitMQ官方团队维护的3个客户端(Java、.NET、Erlang语言)的心跳超时时间协商逻辑如下:
- 服务端和客户端设置的心跳超时时间都不为0时,两者间较小的值生效。
- 服务端和客户端任意一端设置的心跳超时时间为0,另一端不为0时,非0的值生效。
- 服务端和客户端的心跳超时时间都设置为0时,表示禁用心跳。
配置心跳超时时间后,RabbitMQ服务端和客户端都会向对方发送AMQP心跳帧作为心跳,发送的时间间隔为心跳超时时间的一半。客户端在两次错过心跳后,会被认为是不可达的,TCP连接将被关闭。当客户端检测到服务端由于心跳而无法访问时,需要重新连接。
说明一些客户端(如C语言客户端)没有发送心跳的逻辑,即使配置了心跳超时时间,开启了心跳,仍然无法发送心跳。此时需要额外启动一个线程,编写发送心跳的逻辑。
LVS的心跳超时时间
RabbitMQ集群实例使用LVS进行负载均衡,如图1所示,单节点实例不涉及LVS。
图1 集群实例的负载均衡
LVS对客户端连接设置了心跳超时时间,默认为90秒。如果客户端在90秒内没有向LVS发送心跳(AMQP心跳帧或消息收发),LVS会主动断开与客户端的连接,此时客户端需要重新连接。
如果存在消息收发时间间隔大于90秒的场景,请在客户端开启心跳并设置小于90秒的心跳超时时间。
客户端如何配置心跳超时时间
- 在Java客户端配置心跳超时时间。
在创建连接前使用ConnectionFactory#setRequestedHeartbeat进行设置,示例如下:
ConnectionFactory cf = new ConnectionFactory();
// 将心跳超时时间设置为60秒
cf.setRequestedHeartbeat(60);
- 在.NET客户端配置心跳超时时间,示例如下。
var cf = new ConnectionFactory();
// 将心跳超时设置为60秒
cf.RequestedHeartbeat = TimeSpan.FromSeconds(60);
- 在Python pika客户端配置心跳超时时间,示例如下。
# 设置心跳时间为60秒
params = pika.ConnectionParameters(host='host', heartbeat=60, credentials=pika.PlainCredentials('username', 'passwd'))
connection = pika.BlockingConnection(params)
while True:
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
# 生产者需要使用connection.sleep()才能触发心跳,使用time.sleep()不会触发心跳
connection.sleep(200)