RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移构建在开放电信平台框架上。所有主要的编程语言均有与代理接口通讯的客户端库。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
RabbitMQ对于高可用是基于主从的方式进行实现的,有三种工作模式: 单机模式、普通集群模式、镜像集群模式。
1 RabbitMQ工作模式
1.1 单机模式
单机模式即只有一个节点,如果这个节点出现故障,将导致对应服务不可用,因此正常线上服务不使用该模式。
1.2 普通集群模式
普通集群模式即在多个服务器上部署多个MQ实例, 每台机器一个实例。创建的每一个queue,只会存在一个MQ实例上,但是每一个实例都会同步queue的元数据(即queue的标识信息)。进行消费时, 即使连接到了其他的MQ实例上, 其也会根据内部的queue的元数据,从该queue所在实例上拉取数据过来.
普通集群方式只是通过集群部署的方式提高了消息的吞吐量,但是并没有考虑到高可用。由于需要不断去其他实例拉取数据,因此性能开销巨大,容易造成单实例的性能瓶颈。同时,如果真正有数据的那个queue的实例宕机了,则其他的实例就无法进行数据的拉取。
1.3 镜像集群模式
镜像集群模式与普通集群模式的主要区别在于,无论queue的元数据还是queue中的消息都会同时存在于多个实例上。
要开启镜像集群模式,需要在后台新增镜像集群模式策略,即要求数据同步到所有的节点。也可以指定同步到指定数量的节点。
这种方式的好处在于, 任何一个服务宕机都不会影响整个集群数据的完整性,因为其他服务中都有queue的完整数据,当进行消息消费时,连接其他的服务器节点同样也可以获取到数据。
镜像集群模式的缺点:
- 性能开销大: 因为需要进行整个集群内部所有实例的数据同步
- 无法线性扩容: 因为每一个服务器中都包含整个集群服务节点中的所有数据, 这样存储瓶颈会出现在每个节点存储上。
2.JAVA client实现
2.1 CachingConnectionFactory
spring中的 CachingConnectionFactory为我们提供了两种缓存的模式:
- CHANNEL模式:也是CachingConnectionFactory的默认模式,在这种模式下,所有的createConnection()方法实际上返回的都是同一个Connection,同样的Connection.close()方法是没用的。默认情况下,Connection中只缓存了一个Channel,这种模式在并发量不大时是完全够用的。当并发量较高时,我们可以setChannelCacheSize()来增加Connection中缓存的Channel的数量。
- CONNECTION模式:在CONNECTION模式下,每一次调用createConnection()方法都会新建一个或者从缓存中获取,根据设置的ConnectionCacheSize的大小,小于该值时会采用新建的策略,而大于时采用从缓存中获取的策略。与CHANNEL模式不同的是,CONNECTION模式对Connection和Channel都进行了缓存,最新版本的client中已经将Channel的缓存数量从1增加到了25,但是在并发量不是特别大的情况下,作用并不是特别明显。
使用spring amqp需在pom中添加相应配置:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.2.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> |
创建连接池实现
- 单个节点
private static CachingConnectionFactory createCachingConnectionFactory( List<String> addresses, String username, String password, String vhost) { CachingConnectionFactory connection_factory = new CachingConnectionFactory( new ConnectionFactory()); connection_factory.setHost("127.0.0.1"); connection_factory.setPort(5672); connection_factory.setUsername(username); connection_factory.setPassword(password); connection_factory.setVirtualHost(vhost); return connection_factory; } |
- 多个节点
private static CachingConnectionFactory createCachingConnectionFactory( List<String> addresses, String username, String password, String vhost) { CachingConnectionFactory connection_factory = new CachingConnectionFactory( new ConnectionFactory()); connection_factory.setAddresses("192.168.0.1:5672,192.168.0.1:5672"); connection_factory.setUsername(username); connection_factory.setPassword(password); connection_factory.setVirtualHost(vhost); return connection_factory; } |
2.2 RabbitTemplate
Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作。
2.2.1 发送消息
- send (自定义消息 Message)
Message message = new Message("hello".getBytes(),new MessageProperties()); // 发送消息到默认的交换器,默认的路由键 rabbitTemplate.send(message); // 发送消息到指定的交换器,指定的路由键 rabbitTemplate.send("direct.exchange","key.1",message); // 发送消息到指定的交换器,指定的路由键 rabbitTemplate.send("direct.exchange","key.1",message,new CorrelationData(UUID.randomUUID().toString())); |
- convertAndSend(自动 Java 对象包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口)
User user = new User("test"); // 发送消息到默认的交换器,默认的路由键 rabbitTemplate.convertAndSend(user); // 发送消息到指定的交换器,指定的路由键,设置消息 ID rabbitTemplate.convertAndSend("direct.exchange","key.1",user,new CorrelationData(UUID.randomUUID().toString())); // 发送消息到指定的交换器,指定的路由键,在消息转换完成后,通过 MessagePostProcessor 来添加属性 rabbitTemplate.convertAndSend("direct.exchange","key.1",user,mes -> { mes.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return mes; }); |
2.2.2 接收消息
- receive(返回 Message 对象)
// 接收来自指定队列的消息,并设置超时时间 Message msg = rabbitTemplate.receive("debug",2000l); |
- receiveAndConvert(将返回 Message 转换成 Java 对象)
User user = (User) rabbitTemplate.receiveAndConvert(); |
以上内容为RabbitMQ高性能方案的相关介绍。学习了解RabbitMQ及部署方案,有利于对集群部署操作和高性能方案等内容有更好的理解。