searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

AMQP on Pulsar在天翼云的实践与优化

2022-11-28 03:11:49
143
0

一、  背景

随着公司快速发展,内部用到的RabbitMQ场景变得更丰富,集群压力也在不断增长,RabbitMQ性能、稳定性问题也暴露出来,包括以下问题:

  • 集群压力到达一定量时,单节点宕机后,客户端压力重连其他节点,其他节点也出现假死、响应极慢甚至服务停止情况的雪崩效应。
  • 在一些网络不稳定的环境,RabbitMQ时常出现脑裂现象,通过开源的补偿机制牺牲一定可用性和可靠性能够缓解脑裂现象,但是未能完全解决。
  • 公司内部有场景用到大量广播镜像队列,当一个消息经过广播交换器广播到N个3副本镜像队列,也就是一个消息存了3N份,占用大量磁盘、内存、带宽。

 

同时公司云产品分布式消息队列RabbitMQ需要提供高性能、高可靠、高稳定的消息服务,并且我们需要研发企业级增强能力来提高产品竞争力。基于原生RabbitMQ实现这些能力,二次开发难度相当较大。

基于以上两个主要原因,我们探索了实现AMQP消息队列产品的多个方案或路径,其中基于Apache Pulsar和AoP(AMQP on Pulsar)是其中一个路径。

Apache Pulsar

Apache Pulsar 作为 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、跨区域复制、具有强一致性、高吞吐、低延迟及高可扩展性等流数据存储特性

Apache Pulsar 采用计算存储分离的一个架构,Pulsar 的 Broker 层就是一层无状态的计算逻辑层,主要负责接收和分发消息,而存储层由 Bookie 节点组成,负责消息的存储和读取。

Pulsar 的这种计算存储分离式的架构,可以做到水平扩容不受限制,可以分别针对计算层和存储层扩容,如果系统的 Producer、Consumer 比较多,那么就可以直接扩容计算逻辑层 Broker,不受数据一致性的影响。

AoP(AMQP on Pulsar)

Pulsar 2.5 版本后,将 Protocol Handler 接口单独脱离了出来,利用这个框架就可以单独实现自定义协议的转换,比如 Kafka、AMQP、MQTT 等。实现了不同协议的 Protocol Handler,Pulsar broker 就具有读写/解析其他协议的能力。下图为 AMQP on Pulsar 采用的架构模型。

AoP种的模型转换实现如下图,AmqpExchange通过一个存储消息的Pulsar Topic和一个Replicator实现,AmqpQueue通过多个Router和一个存储消息索引的Pulsar Topic实现。完成的消息流转流程如下:

  1. 当AMQP生产者发送消息到 AmqpExchange,AmqpExchange 将消息持久化到 Pulsar Topic (我们称之为存储原始消息的 Topic)。
  2. AmqpExchange 的 Replicator 会将消息传递给 Router。
  3. Router 判断是否需要将消息路由给 AmqpQueue。如果是,会将原始消息的 ID 存入AmqpQueue 的 Topic 中 (我们称之为存储索引消息的 Topic)。
  4. AmqpQueue 将消息传递给 Consumer。

二、  AoP的实践与优化

实践:解决AoP单Bundle限制

AoP的单bundle限制现状

我们内部使用RabbitMQ的场景大部分都是只用一个虚拟主机(vhost)或者大部分压力都集中在某个vhost。 当我们试用Apache Pulsar和AoP后,发现的最大的问题就是AoP的单Bundle限制,目前AoP实现的vhost由一个单Bundle的命名空间实现,vhost下的所有交换器、队列资源都集中在vhost对应命名空间bundle所在的Broker。

如上图所示,大部分场景的应用都是使用集群的vhost1虚拟主机,AoP Proxy根据client所连接的vhost1确定服务资源存在Broker2后,所有请求压力都集中在Broker2,所有vhost1下的exchange、queue使用的资源都集中在Broker2。同时当集群扩容Broker节点时,新加的Broker并不能为原来的集群分担压力,提高集群吞吐量。总的来说,AoP的单bundle限制极大降低了Apache pulsar集群的性能和扩展性。

AoP的单bundle限制原因

在介绍我们的优化方法之前先简单介绍目前AoP只能使用单bundle的具体原因,以下是AoP实现原理对简单示意图,vhost里实现的Exchange、Queue都是依赖于Pulsar Topic,Exchange消息也是在当前Broker进程内转发到Queue。

从以上实现来看,开源AoP限制了只能使用单bundle的namespace去实现vhost主要有以下两个原因:

  1. AMQP和Pulsar协议区别:AMQP协议中client生产消费使用的Queue或Exchange建立连接时候并不是确定,当发出具体生产、消费请求时才知道client使用哪个Queue或Exchange,协议中默认连接任意Broker都能对所有Queue和Exchange请求。而Pulsar建立连接的时候就需要知道生产、消费所使用Topic,从而知道client需要连接哪个Broker,每个Topic只有其中一个Broker能处理请求。

上面协议差距导致AoP实现时就需要在Proxy确定client请求重定向到哪个Broker,而只能根据连接信息的vhost信息来重定向,只有所有vhost资源在同一个broker才能保证重定向到client所需Queue或Exchange对应Topic的Broker。

  1. Replicator的实现:目前Replicator是从Exchange对应的Topic消费消息,当符合Queue的路由规则时,将消息的索引生产到Queue对应的Topic,这就限制了Queue和Exchange对应的Topic必须在同个Broker中。

优化:基于Pulsar client重新实现AmqpExchange和AmqpQueue

为解决单Bundle的限制,我们参考了Pulsar Summit Asia 2021中林宇强大佬演讲《RabbitMQ on Pulsar 的实践和架构演进之路》中的做法,类似Gateway模式去实现AoP中的AmqpExchange和AmqpQueue。在这种模式中有几个关键点:

  1. AmqpExchange和AmqpQueue不再依赖Pulsar Topic去实现,直接把元数据存储在zk、etcd等服务中。每个节点都能处理每个Exchange或Queue的请求。
  2. 涉及数据存取操作直接通过pulsar client去实现。即使所需Topic不在当前节点也能正常生产消费。

通过上面改造我们基本就能实现AoP的多bundle效果。AMQP生产者可以不再需要通过Proxy来生产消费,直接连接Broker的AoP服务。由于AoP对主题的操作都是通过pulsar client的RPC操作去实现,不再限制在同一个broker,所以vhost对应的namespace可以设置多个bundle。

虽然可以减少Proxy层,但是可以看到一条消息的生命周期在AoP中涉及多次pulsar client的RPC调用,如下:

  1. 消息生产到 Exchange Topic。
  2. Replicator将消息从Exchange Topic消费。
  3. Replicator将消息的索引发送到Queue Exchange。
  4. Queue从QueueExchange消费消息索引。
  5. Queue通过消息索引从ExchangeTopic获取消息。

这么多次RPC调用这样会降低集群的性能,请求延迟会非常大。针对这个问题,我们将消息流转流程做了一下简化,如下图所示:

AMQP生产者把消息发送到Exchange之后,马上通过路由匹配,直接把符合匹配规则的消息发送到Queue Topic。Queue的消费者直接从Queue Topic消费消息发送到AMQP消费者。这样做将原来5次Pulsar client操作降低为2次。

性能测试

在以下环境对优化后的AoP进行性能测试

AoP服务端

服务器数量

cpu

内存

磁盘

副本

3台虚拟机

4核

16GB

200G SAS

2

压测Client端

服务器数量

cpu

内存

磁盘

消息体

持久化

2台虚拟机

4核

16GB

200G SAS

1k

优化前压测结果:

由于社区版AoP不能复用Pulsar的metrics,只能根据压测客户端日志给出大概情况,根据我们测试,社区版AoP消费tps跟不上生产tps,刚开始压测时已经是性能峰值,后面追赶读性能更差。维持不追赶读,性能最高大概只有1.8w tps左右。只生产不消费时,刚开始性能在5w tps左右,后面稳定在2w tps左右。

优化后压测结果

由于基于Pulsar clinet实现了Exchange和Queue,可以监控pulsar metrics指标。刚开始峰值到10wtps,后续稳定在7w tps左右

三、  总结

经过多bundle改造,AoP继承了Apache Pulsar的扩展性和负载均衡能力,同时继承了Apache Pulsar丰富的metrics指标。消费性能比社区版本的AoP提升很大,测试中社区版的消费tps远低于生产tps。

后续我们还会不断打造产品企业级功能,确保产品的稳定性。在天翼云4.0中退出分布式消息队列RabbitMQ的云原生引擎版本。

参考资料

1、Apache Pulsar 官网:https://pulsar.apache.org/

2、AMQP on Pulsar:https://github.com/streamnative/aop

3、RabbitMQ on Pulsar 的实践和架构演进之路【Pulsar Summit Asia 2021】

https://www.bilibili.com/video/BV1Ka411y7me/

4、AMQP on Pulsar 的设计与应用一览https://posts.careerengine.us/p/616d0a3a3900050b5588bd57

0条评论
0 / 1000
汤****健
2文章数
2粉丝数
汤****健
2 文章 | 2 粉丝
汤****健
2文章数
2粉丝数
汤****健
2 文章 | 2 粉丝
原创

AMQP on Pulsar在天翼云的实践与优化

2022-11-28 03:11:49
143
0

一、  背景

随着公司快速发展,内部用到的RabbitMQ场景变得更丰富,集群压力也在不断增长,RabbitMQ性能、稳定性问题也暴露出来,包括以下问题:

  • 集群压力到达一定量时,单节点宕机后,客户端压力重连其他节点,其他节点也出现假死、响应极慢甚至服务停止情况的雪崩效应。
  • 在一些网络不稳定的环境,RabbitMQ时常出现脑裂现象,通过开源的补偿机制牺牲一定可用性和可靠性能够缓解脑裂现象,但是未能完全解决。
  • 公司内部有场景用到大量广播镜像队列,当一个消息经过广播交换器广播到N个3副本镜像队列,也就是一个消息存了3N份,占用大量磁盘、内存、带宽。

 

同时公司云产品分布式消息队列RabbitMQ需要提供高性能、高可靠、高稳定的消息服务,并且我们需要研发企业级增强能力来提高产品竞争力。基于原生RabbitMQ实现这些能力,二次开发难度相当较大。

基于以上两个主要原因,我们探索了实现AMQP消息队列产品的多个方案或路径,其中基于Apache Pulsar和AoP(AMQP on Pulsar)是其中一个路径。

Apache Pulsar

Apache Pulsar 作为 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、跨区域复制、具有强一致性、高吞吐、低延迟及高可扩展性等流数据存储特性

Apache Pulsar 采用计算存储分离的一个架构,Pulsar 的 Broker 层就是一层无状态的计算逻辑层,主要负责接收和分发消息,而存储层由 Bookie 节点组成,负责消息的存储和读取。

Pulsar 的这种计算存储分离式的架构,可以做到水平扩容不受限制,可以分别针对计算层和存储层扩容,如果系统的 Producer、Consumer 比较多,那么就可以直接扩容计算逻辑层 Broker,不受数据一致性的影响。

AoP(AMQP on Pulsar)

Pulsar 2.5 版本后,将 Protocol Handler 接口单独脱离了出来,利用这个框架就可以单独实现自定义协议的转换,比如 Kafka、AMQP、MQTT 等。实现了不同协议的 Protocol Handler,Pulsar broker 就具有读写/解析其他协议的能力。下图为 AMQP on Pulsar 采用的架构模型。

AoP种的模型转换实现如下图,AmqpExchange通过一个存储消息的Pulsar Topic和一个Replicator实现,AmqpQueue通过多个Router和一个存储消息索引的Pulsar Topic实现。完成的消息流转流程如下:

  1. 当AMQP生产者发送消息到 AmqpExchange,AmqpExchange 将消息持久化到 Pulsar Topic (我们称之为存储原始消息的 Topic)。
  2. AmqpExchange 的 Replicator 会将消息传递给 Router。
  3. Router 判断是否需要将消息路由给 AmqpQueue。如果是,会将原始消息的 ID 存入AmqpQueue 的 Topic 中 (我们称之为存储索引消息的 Topic)。
  4. AmqpQueue 将消息传递给 Consumer。

二、  AoP的实践与优化

实践:解决AoP单Bundle限制

AoP的单bundle限制现状

我们内部使用RabbitMQ的场景大部分都是只用一个虚拟主机(vhost)或者大部分压力都集中在某个vhost。 当我们试用Apache Pulsar和AoP后,发现的最大的问题就是AoP的单Bundle限制,目前AoP实现的vhost由一个单Bundle的命名空间实现,vhost下的所有交换器、队列资源都集中在vhost对应命名空间bundle所在的Broker。

如上图所示,大部分场景的应用都是使用集群的vhost1虚拟主机,AoP Proxy根据client所连接的vhost1确定服务资源存在Broker2后,所有请求压力都集中在Broker2,所有vhost1下的exchange、queue使用的资源都集中在Broker2。同时当集群扩容Broker节点时,新加的Broker并不能为原来的集群分担压力,提高集群吞吐量。总的来说,AoP的单bundle限制极大降低了Apache pulsar集群的性能和扩展性。

AoP的单bundle限制原因

在介绍我们的优化方法之前先简单介绍目前AoP只能使用单bundle的具体原因,以下是AoP实现原理对简单示意图,vhost里实现的Exchange、Queue都是依赖于Pulsar Topic,Exchange消息也是在当前Broker进程内转发到Queue。

从以上实现来看,开源AoP限制了只能使用单bundle的namespace去实现vhost主要有以下两个原因:

  1. AMQP和Pulsar协议区别:AMQP协议中client生产消费使用的Queue或Exchange建立连接时候并不是确定,当发出具体生产、消费请求时才知道client使用哪个Queue或Exchange,协议中默认连接任意Broker都能对所有Queue和Exchange请求。而Pulsar建立连接的时候就需要知道生产、消费所使用Topic,从而知道client需要连接哪个Broker,每个Topic只有其中一个Broker能处理请求。

上面协议差距导致AoP实现时就需要在Proxy确定client请求重定向到哪个Broker,而只能根据连接信息的vhost信息来重定向,只有所有vhost资源在同一个broker才能保证重定向到client所需Queue或Exchange对应Topic的Broker。

  1. Replicator的实现:目前Replicator是从Exchange对应的Topic消费消息,当符合Queue的路由规则时,将消息的索引生产到Queue对应的Topic,这就限制了Queue和Exchange对应的Topic必须在同个Broker中。

优化:基于Pulsar client重新实现AmqpExchange和AmqpQueue

为解决单Bundle的限制,我们参考了Pulsar Summit Asia 2021中林宇强大佬演讲《RabbitMQ on Pulsar 的实践和架构演进之路》中的做法,类似Gateway模式去实现AoP中的AmqpExchange和AmqpQueue。在这种模式中有几个关键点:

  1. AmqpExchange和AmqpQueue不再依赖Pulsar Topic去实现,直接把元数据存储在zk、etcd等服务中。每个节点都能处理每个Exchange或Queue的请求。
  2. 涉及数据存取操作直接通过pulsar client去实现。即使所需Topic不在当前节点也能正常生产消费。

通过上面改造我们基本就能实现AoP的多bundle效果。AMQP生产者可以不再需要通过Proxy来生产消费,直接连接Broker的AoP服务。由于AoP对主题的操作都是通过pulsar client的RPC操作去实现,不再限制在同一个broker,所以vhost对应的namespace可以设置多个bundle。

虽然可以减少Proxy层,但是可以看到一条消息的生命周期在AoP中涉及多次pulsar client的RPC调用,如下:

  1. 消息生产到 Exchange Topic。
  2. Replicator将消息从Exchange Topic消费。
  3. Replicator将消息的索引发送到Queue Exchange。
  4. Queue从QueueExchange消费消息索引。
  5. Queue通过消息索引从ExchangeTopic获取消息。

这么多次RPC调用这样会降低集群的性能,请求延迟会非常大。针对这个问题,我们将消息流转流程做了一下简化,如下图所示:

AMQP生产者把消息发送到Exchange之后,马上通过路由匹配,直接把符合匹配规则的消息发送到Queue Topic。Queue的消费者直接从Queue Topic消费消息发送到AMQP消费者。这样做将原来5次Pulsar client操作降低为2次。

性能测试

在以下环境对优化后的AoP进行性能测试

AoP服务端

服务器数量

cpu

内存

磁盘

副本

3台虚拟机

4核

16GB

200G SAS

2

压测Client端

服务器数量

cpu

内存

磁盘

消息体

持久化

2台虚拟机

4核

16GB

200G SAS

1k

优化前压测结果:

由于社区版AoP不能复用Pulsar的metrics,只能根据压测客户端日志给出大概情况,根据我们测试,社区版AoP消费tps跟不上生产tps,刚开始压测时已经是性能峰值,后面追赶读性能更差。维持不追赶读,性能最高大概只有1.8w tps左右。只生产不消费时,刚开始性能在5w tps左右,后面稳定在2w tps左右。

优化后压测结果

由于基于Pulsar clinet实现了Exchange和Queue,可以监控pulsar metrics指标。刚开始峰值到10wtps,后续稳定在7w tps左右

三、  总结

经过多bundle改造,AoP继承了Apache Pulsar的扩展性和负载均衡能力,同时继承了Apache Pulsar丰富的metrics指标。消费性能比社区版本的AoP提升很大,测试中社区版的消费tps远低于生产tps。

后续我们还会不断打造产品企业级功能,确保产品的稳定性。在天翼云4.0中退出分布式消息队列RabbitMQ的云原生引擎版本。

参考资料

1、Apache Pulsar 官网:https://pulsar.apache.org/

2、AMQP on Pulsar:https://github.com/streamnative/aop

3、RabbitMQ on Pulsar 的实践和架构演进之路【Pulsar Summit Asia 2021】

https://www.bilibili.com/video/BV1Ka411y7me/

4、AMQP on Pulsar 的设计与应用一览https://posts.careerengine.us/p/616d0a3a3900050b5588bd57

文章来自个人专栏
消息中间件
2 文章 | 2 订阅
0条评论
0 / 1000
请输入你的评论
0
0