searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

RabbitMQ高性能方案

2023-01-03 10:09:44
19
0

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移构建在开放电信平台框架上。所有主要的编程语言均有与代理接口通讯的客户端库。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

RabbitMQ对于高可用是基于主从的方式进行实现的,有三种工作模式: 单机模式、普通集群模式、镜像集群模式。

1 RabbitMQ工作模式

1.1 单机模式

单机模式即只有一个节点,如果这个节点出现故障,将导致对应服务不可用,因此正常线上服务不使用该模式。

1.2 普通集群模式

普通集群模式即在多个服务器上部署多个MQ实例, 每台机器一个实例。创建的每一个queue,只会存在一个MQ实例上,但是每一个实例都会同步queue的元数据(即queue的标识信息)。进行消费时, 即使连接到了其他的MQ实例上, 其也会根据内部的queue的元数据,从该queue所在实例上拉取数据过来.

普通集群方式只是通过集群部署的方式提高了消息的吞吐量,但是并没有考虑到高可用。由于需要不断去其他实例拉取数据,因此性能开销巨大,容易造成单实例的性能瓶颈。同时,如果真正有数据的那个queue的实例宕机了,则其他的实例就无法进行数据的拉取。

 

1.3 镜像集群模式

镜像集群模式与普通集群模式的主要区别在于,无论queue的元数据还是queue中的消息都会同时存在于多个实例上。

 

要开启镜像集群模式,需要在后台新增镜像集群模式策略,即要求数据同步到所有的节点。也可以指定同步到指定数量的节点。

这种方式的好处在于, 任何一个服务宕机都不会影响整个集群数据的完整性,因为其他服务中都有queue的完整数据,当进行消息消费时,连接其他的服务器节点同样也可以获取到数据。

镜像集群模式的缺点:

  1. 性能开销大: 因为需要进行整个集群内部所有实例的数据同步
  2. 无法线性扩容: 因为每一个服务器中都包含整个集群服务节点中的所有数据, 这样存储瓶颈会出现在每个节点存储上。

2.JAVA client实现

2.1 CachingConnectionFactory

spring中的 CachingConnectionFactory为我们提供了两种缓存的模式:

  • CHANNEL模式:也是CachingConnectionFactory的默认模式,在这种模式下,所有的createConnection()方法实际上返回的都是同一个Connection,同样的Connection.close()方法是没用的。默认情况下,Connection中只缓存了一个Channel,这种模式在并发量不大时是完全够用的。当并发量较高时,我们可以setChannelCacheSize()来增加Connection中缓存的Channel的数量。
  • CONNECTION模式:在CONNECTION模式下,每一次调用createConnection()方法都会新建一个或者从缓存中获取,根据设置的ConnectionCacheSize的大小,小于该值时会采用新建的策略,而大于时采用从缓存中获取的策略。与CHANNEL模式不同的是,CONNECTION模式对Connection和Channel都进行了缓存,最新版本的client中已经将Channel的缓存数量从1增加到了25,但是在并发量不是特别大的情况下,作用并不是特别明显。

 

使用spring amqp需在pom中添加相应配置:

<dependencies>

    <dependency>

        <groupId>com.rabbitmq</groupId>

        <artifactId>amqp-client</artifactId>

        <version>5.0.0</version>

    </dependency>

 

    <dependency>

        <groupId>org.springframework.amqp</groupId>

        <artifactId>spring-rabbit</artifactId>

        <version>2.0.2.RELEASE</version>

    </dependency>

 

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-web</artifactId>

    </dependency>

 

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-amqp</artifactId>

    </dependency>

</dependencies>

创建连接池实现

  • 单个节点

private static CachingConnectionFactory createCachingConnectionFactory(

        List<String> addresses, String username,

        String password, String vhost) {

    CachingConnectionFactory connection_factory = new CachingConnectionFactory(

            new ConnectionFactory());

    connection_factory.setHost("127.0.0.1");

    connection_factory.setPort(5672);

    connection_factory.setUsername(username);

    connection_factory.setPassword(password);

    connection_factory.setVirtualHost(vhost);

    return connection_factory;

}

 

  • 多个节点

private static CachingConnectionFactory createCachingConnectionFactory(

        List<String> addresses, String username,

        String password, String vhost) {

    CachingConnectionFactory connection_factory = new CachingConnectionFactory(

            new ConnectionFactory());

    connection_factory.setAddresses("192.168.0.1:5672,192.168.0.1:5672");

    connection_factory.setUsername(username);

    connection_factory.setPassword(password);

    connection_factory.setVirtualHost(vhost);

    return connection_factory;

}

 

2.2 RabbitTemplate

Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

2.2.1 发送消息

  • send (自定义消息 Message)

Message message = new Message("hello".getBytes(),new MessageProperties());

// 发送消息到默认的交换器,默认的路由键

rabbitTemplate.send(message);

// 发送消息到指定的交换器,指定的路由键

rabbitTemplate.send("direct.exchange","key.1",message);

// 发送消息到指定的交换器,指定的路由键

rabbitTemplate.send("direct.exchange","key.1",message,new CorrelationData(UUID.randomUUID().toString()));

  • convertAndSend(自动 Java 对象包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口)

User user = new User("test");

// 发送消息到默认的交换器,默认的路由键

rabbitTemplate.convertAndSend(user);

// 发送消息到指定的交换器,指定的路由键,设置消息 ID

rabbitTemplate.convertAndSend("direct.exchange","key.1",user,new CorrelationData(UUID.randomUUID().toString()));

// 发送消息到指定的交换器,指定的路由键,在消息转换完成后,通过 MessagePostProcessor 来添加属性

rabbitTemplate.convertAndSend("direct.exchange","key.1",user,mes -> {

    mes.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

        return mes;

});

 

2.2.2 接收消息

  • receive(返回 Message 对象)

// 接收来自指定队列的消息,并设置超时时间

Message msg = rabbitTemplate.receive("debug",2000l);

 

  • receiveAndConvert(将返回 Message 转换成 Java 对象)

User user = (User) rabbitTemplate.receiveAndConvert();

以上内容为RabbitMQ高性能方案的相关介绍。学习了解RabbitMQ及部署方案,有利于对集群部署操作和高性能方案等内容有更好的理解。

0条评论
0 / 1000
芋泥麻薯
9文章数
1粉丝数
芋泥麻薯
9 文章 | 1 粉丝
原创

RabbitMQ高性能方案

2023-01-03 10:09:44
19
0

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移构建在开放电信平台框架上。所有主要的编程语言均有与代理接口通讯的客户端库。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

RabbitMQ对于高可用是基于主从的方式进行实现的,有三种工作模式: 单机模式、普通集群模式、镜像集群模式。

1 RabbitMQ工作模式

1.1 单机模式

单机模式即只有一个节点,如果这个节点出现故障,将导致对应服务不可用,因此正常线上服务不使用该模式。

1.2 普通集群模式

普通集群模式即在多个服务器上部署多个MQ实例, 每台机器一个实例。创建的每一个queue,只会存在一个MQ实例上,但是每一个实例都会同步queue的元数据(即queue的标识信息)。进行消费时, 即使连接到了其他的MQ实例上, 其也会根据内部的queue的元数据,从该queue所在实例上拉取数据过来.

普通集群方式只是通过集群部署的方式提高了消息的吞吐量,但是并没有考虑到高可用。由于需要不断去其他实例拉取数据,因此性能开销巨大,容易造成单实例的性能瓶颈。同时,如果真正有数据的那个queue的实例宕机了,则其他的实例就无法进行数据的拉取。

 

1.3 镜像集群模式

镜像集群模式与普通集群模式的主要区别在于,无论queue的元数据还是queue中的消息都会同时存在于多个实例上。

 

要开启镜像集群模式,需要在后台新增镜像集群模式策略,即要求数据同步到所有的节点。也可以指定同步到指定数量的节点。

这种方式的好处在于, 任何一个服务宕机都不会影响整个集群数据的完整性,因为其他服务中都有queue的完整数据,当进行消息消费时,连接其他的服务器节点同样也可以获取到数据。

镜像集群模式的缺点:

  1. 性能开销大: 因为需要进行整个集群内部所有实例的数据同步
  2. 无法线性扩容: 因为每一个服务器中都包含整个集群服务节点中的所有数据, 这样存储瓶颈会出现在每个节点存储上。

2.JAVA client实现

2.1 CachingConnectionFactory

spring中的 CachingConnectionFactory为我们提供了两种缓存的模式:

  • CHANNEL模式:也是CachingConnectionFactory的默认模式,在这种模式下,所有的createConnection()方法实际上返回的都是同一个Connection,同样的Connection.close()方法是没用的。默认情况下,Connection中只缓存了一个Channel,这种模式在并发量不大时是完全够用的。当并发量较高时,我们可以setChannelCacheSize()来增加Connection中缓存的Channel的数量。
  • CONNECTION模式:在CONNECTION模式下,每一次调用createConnection()方法都会新建一个或者从缓存中获取,根据设置的ConnectionCacheSize的大小,小于该值时会采用新建的策略,而大于时采用从缓存中获取的策略。与CHANNEL模式不同的是,CONNECTION模式对Connection和Channel都进行了缓存,最新版本的client中已经将Channel的缓存数量从1增加到了25,但是在并发量不是特别大的情况下,作用并不是特别明显。

 

使用spring amqp需在pom中添加相应配置:

<dependencies>

    <dependency>

        <groupId>com.rabbitmq</groupId>

        <artifactId>amqp-client</artifactId>

        <version>5.0.0</version>

    </dependency>

 

    <dependency>

        <groupId>org.springframework.amqp</groupId>

        <artifactId>spring-rabbit</artifactId>

        <version>2.0.2.RELEASE</version>

    </dependency>

 

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-web</artifactId>

    </dependency>

 

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-amqp</artifactId>

    </dependency>

</dependencies>

创建连接池实现

  • 单个节点

private static CachingConnectionFactory createCachingConnectionFactory(

        List<String> addresses, String username,

        String password, String vhost) {

    CachingConnectionFactory connection_factory = new CachingConnectionFactory(

            new ConnectionFactory());

    connection_factory.setHost("127.0.0.1");

    connection_factory.setPort(5672);

    connection_factory.setUsername(username);

    connection_factory.setPassword(password);

    connection_factory.setVirtualHost(vhost);

    return connection_factory;

}

 

  • 多个节点

private static CachingConnectionFactory createCachingConnectionFactory(

        List<String> addresses, String username,

        String password, String vhost) {

    CachingConnectionFactory connection_factory = new CachingConnectionFactory(

            new ConnectionFactory());

    connection_factory.setAddresses("192.168.0.1:5672,192.168.0.1:5672");

    connection_factory.setUsername(username);

    connection_factory.setPassword(password);

    connection_factory.setVirtualHost(vhost);

    return connection_factory;

}

 

2.2 RabbitTemplate

Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

2.2.1 发送消息

  • send (自定义消息 Message)

Message message = new Message("hello".getBytes(),new MessageProperties());

// 发送消息到默认的交换器,默认的路由键

rabbitTemplate.send(message);

// 发送消息到指定的交换器,指定的路由键

rabbitTemplate.send("direct.exchange","key.1",message);

// 发送消息到指定的交换器,指定的路由键

rabbitTemplate.send("direct.exchange","key.1",message,new CorrelationData(UUID.randomUUID().toString()));

  • convertAndSend(自动 Java 对象包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口)

User user = new User("test");

// 发送消息到默认的交换器,默认的路由键

rabbitTemplate.convertAndSend(user);

// 发送消息到指定的交换器,指定的路由键,设置消息 ID

rabbitTemplate.convertAndSend("direct.exchange","key.1",user,new CorrelationData(UUID.randomUUID().toString()));

// 发送消息到指定的交换器,指定的路由键,在消息转换完成后,通过 MessagePostProcessor 来添加属性

rabbitTemplate.convertAndSend("direct.exchange","key.1",user,mes -> {

    mes.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

        return mes;

});

 

2.2.2 接收消息

  • receive(返回 Message 对象)

// 接收来自指定队列的消息,并设置超时时间

Message msg = rabbitTemplate.receive("debug",2000l);

 

  • receiveAndConvert(将返回 Message 转换成 Java 对象)

User user = (User) rabbitTemplate.receiveAndConvert();

以上内容为RabbitMQ高性能方案的相关介绍。学习了解RabbitMQ及部署方案,有利于对集群部署操作和高性能方案等内容有更好的理解。

文章来自个人专栏
RabbitMQ
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0