本章节基于吞吐量和可靠性两个指标,指导您通过设置队列长度、集群负载均衡、优先队列数量等参数,实现RabbitMQ的高性能。
使用较小的队列长度
队列中存在大量消息时,会给内存使用带来沉重的负担,为了释放内存,RabbitMQ会将消息刷新到磁盘。这个过程通常需要时间,由于需要重建索引,重启包含大量消息的集群非常耗时。当刷盘的消息过多时,会阻塞队列处理消息,从而降低队列速度,对RabbitMQ节点的性能产生负面影响。
要获得最佳性能,应尽可能缩短队列。建议始终保持队列消息堆积的数量在0左右。
对于经常受到消息峰值影响的应用程序,和对吞吐量要求较高的应用程序,建议在队列上设置 最大长度 。这样可以通过丢弃队列头部的消息来保持队列长度,队列长度永远不会大于最大长度设置。
最大长度可以通过Policy设置,也可以通过在队列声明时使用对应参数设置。
- 在Policy中设置
- 在队列声明时使用对应参数设置
//创建队列 HashMap<String, Object> map = new HashMap<>(); //设置队列最大长度 map.put("x-max-length",10 ); //设置队列溢出方式保留前10 map.put("x-overflow","reject-publish" ); channel.queueDeclare(queueName,false,false,false,map);
当队列长度超过设置的最大长度时,RabbitMQ的默认做法是将队列头部的信息(队列中最老的消息)丢弃或变成死信。可以通过设置不同的overflow值来改变这种方式,如果overflow值设置为 drop-head ,表示从队列前面丢弃或dead-letter消息,保存后n条消息。如果overflow值设置为 reject-publish ,表示最近发布的消息将被丢弃,即保存前n条消息。
说明如果同时使用以上两种方式设置队列的最大长度,两者中较小的值将被使用。
超过队列最大长度的消息会被丢弃,请谨慎使用。
使用集群的负载均衡
队列的性能受单个CPU内核控制,当一个RabbitMQ节点处理消息的能力达到瓶颈时,可以通过集群进行扩展,从而达到提升吞吐量的目的。
使用多个节点,集群会自动将队列均衡的创建在各个节点上。除了使用集群模式,您还可以使用以下两个插件优化负载均衡:
Consistent hash exchange
该插件使用交换器来平衡队列之间的消息。根据消息的路由键,发送到交换器的消息一致且均匀地分布在多个队列中。该插件创建路由键的散列,并将消息传播到与该交换器具有绑定关系的队列中。使用此插件时,需要确保消费者从所有队列中消费。
使用示例如下:
- 使用不同的路由键来路由消息。
public class ConsistentHashExchangeExample1 {
private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, "e1", "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, "e1", "2");
}
ch.confirmSelect();
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
for (int i = 0; i < 100000; i++) {
ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
- 通过不同的header来路由消息,该方式需要为交换器提供“hash-header”参数设置,且消息必须带有header,否则会被路由到相同的队列。
public class ConsistentHashExchangeExample2 {
public static final String EXCHANGE = "e2";
private static String EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
Map<String, Object> args = new HashMap<>();
args.put("hash-header", "hash-on");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, EXCHANGE, "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, EXCHANGE, "2");
}
ch.confirmSelect();
for (int i = 0; i < 100000; i++) {
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
Map<String, Object> hdrs = new HashMap<>();
hdrs.put("hash-on", String.valueOf(i));
ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
- 使用消息属性来路由消息,例如message_id、correlation_id或timestamp属性。该方式需要使用“hash-property”参数来声明交换器,且消息必须带有所选择的消息属性,否则会被路由到相同的队列。
public class ConsistentHashExchangeExample2 {
public static final String EXCHANGE = "e2";
private static String EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
Map<String, Object> args = new HashMap<>();
args.put("hash-header", "hash-on");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, EXCHANGE, "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, EXCHANGE, "2");
}
ch.confirmSelect();
for (int i = 0; i < 100000; i++) {
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
Map<String, Object> hdrs = new HashMap<>();
hdrs.put("hash-on", String.valueOf(i));
ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
RabbitMQ sharding
该插件自动对队列进行分区,也就是说,一旦您将一个交换器定义为sharded,那么在每个集群节点上自动创建支持队列,并在它们之间共享消息。该插件提供了一个集中发送消息的位置,并通过向集群中的其他节点添加队列,实现负载均衡。使用此插件时,需要确保消费者从所有队列中消费。
配置RabbitMQ sharding插件的步骤如下:
- 创建x-modulus-hash属性交换器。
- 为该交换器添加策略。
- 单击该交换器详情,查看是否配置成功。
自动删除不再使用的队列
客户端可能连接失败导致队列被残留,大量的残留队列会影响实例的性能。RabbitMQ提供三种自动删除队列的方法:
- 在队列中设置TTL策略:例如TTL策略设置为28天,当持续28天队列未被使用时,此队列将被删除。
- 使用auto-delete队列:当最后一个消费者退出或通道/连接关闭(或与服务器的TCP连接丢失)时,auto-delete队列会被删除。
- 使用exclusive queue:exclusive queue只能在创建它的连接中使用,当此连接关闭或消失时,exclusive queue会被删除。
设置方法如下:
boolean exclusive = true;
boolean autoDelete = true;
channel.queueDeclare(QUEUENAME, durable, exclusive, autoDelete, arguments);
限制使用优先队列的数量
每个优先队列会启动一个Erlang进程,过多的优先队列会影响性能。在大多数情况下,建议使用不超过5个优先队列。
连接和通道
每个连接使用大约100 KB的内存(如果使用 TLS会更多),成千上万的连接会导致RabbitMQ负载很高,极端情况下,会导致内存溢出。AMQP协议引入了通道的概念,一个连接中可以有多个通道。连接是长期存在的,AMQP连接的握手过程比较复杂,至少需要7个TCP数据包(如果使用TLS会更多)。相对连接来说,打开和关闭通道会更简单,但是建议通道也设置为长期存在的。例如,应该为每个生产者线程重用相同的通道,不要在每次生产时都打开通道。最佳实践是重用连接并将线程之间的连接与通道多路复用。
推荐使用Spring AMQP线程池:ConnectionFactory是Spring AMQP定义的连接工厂,负责创建连接。
不要在线程之间共享通道
大多数客户端并未实现通道的线程安全,所以不要在线程之间共享通道。
不要频繁打开和关闭连接或通道
频繁打开和关闭连接或通道会发送和接收大量的TCP包,从而导致更高的延迟,确保不要频繁打开和关闭连接或通道。
生产者和消费者使用不同的通道
生产者和消费者使用不同的连接以实现高吞吐量。当生产者发送太多消息给服务端处理时,RabbitMQ会将压力传递到TCP连接上。如果在同一个TCP连接上消费,服务端可能不会收到来自客户端的消息确认,从而影响消费性能。若消费速度过低,服务端将不堪重负。
大量的连接和通道可能会影响RabbitMQ管理接口的性能
RabbitMQ会收集每个连接和通道的数据进行分析和显示,大量连接和通道会影响RabbitMQ管理接口的性能。
禁用未使用的插件
插件可能会消耗大量CPU或占用大量内存,建议禁用未使用的插件。