searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

消息选型-Rabbitmq篇

2023-09-21 10:46:56
14
0

 

1.   架构

我们先来看下整体架构。

 

Producer,消息生产者,即消息的发布方, 生产者生产的消息,首先发布到指定的交换机上,交换机通过路由键(RoutingKey)的匹配,选择对应的队列进行投递。消息者订阅队列,消费队列的消息。

Connectiton,客户端与Broker间的TCP连接。

Channel,信道,每个连接采用多路复用,包含多个信道。producer与Broker间采用信道传递数据。

Broker,Rabbit服务节点。

Vhost,虚机机,一个节点下包含多个vhost,vhost间的exchange,queue相互隔离。就好比一台物理机上(Broker)部署多台虚机(vhost),虚拟机采用不同的用户名密码登录,实现多租户。

Exchange,交换机,消息首先会传递到交换机,由交换机匹配路由键(RoutingKey)决定投递到哪个queue。类比邮政局。

Queue,队列,存储消息的数据结构。类比小区的快递柜。

Binding,绑定,交换机与队列间通过路由键(RoutingKey)进行绑定起来。RoutingKey类比目的地址。

Consumer,消费者,即消息的接受方。

2.   消息发布

生产者创建和启动信道连接,声明交换机机,队列,路由键等信息,消息封装成帧,通过信道发送到Broker。

 

2.1.  创建连接和信道

 一个应用程序(生产者)与RabbitMQ服务节点,维护一个TCP连接(Connect),在这个连接中可以创建多个信道用于信息的传递,各个信道间相互隔离。这样做的目的就是避免在多线程的情况下,频繁开启和关闭多个TCP连接,带来的性能消耗(每个TCP经历三次握手,四次挥手),实际就是TCP的多路复用,类似HTTP2.0的原理。

Connection connection = factory.newConnection() ; //创建连接
Channel channel = connection.createChannel() ; //创建信道

生产者与Broker之间完成了8次交互,分别为Connect的启动,调整,打开以及Channel的开启,最终打开信道。

 

后续所有与Broker通讯都是基于该信道,从代码层面看,都是基于channel对象操作。

2.2.  创建交换机,队列

 

生产者将消息发送到交换机,交换机根据路由键,投递到对应的目的队列保存。这个过程和邮件信息非常类似,寄件人将信件送到邮政局,邮政局根据信件的地址投递到目的地的邮箱中。

 RabbitMQ提供了通过管理平台或者命名行预先创建交换机和队列。采用预先定义还是客户端创建,就看业务的需要,比如我们对交互机以及队列预先做了充分的规划,那就采用预先创建;如果需要创建一些临时的交换机或者队列,客户端创建更适合。

要想创建一个可用的交互机和队列,需要完成以下步骤

(1)创建交换机

以Java客户端为例,调用exchangeDeclare方法。

exchangeDeclare(String exchange ,String type , boolean durable ,boolean autoDelete , boolean internal ,Map<String, Object> arguments)

该方法有很多重载的方法,我们不一一介绍,只介绍下几个重要的参数

exchange,定义该交换器的名称

type,定义该交换器的路由类型,分类fanout,direct,topic,headers等(后面我们重点介绍)

autoDelete,是否自动删除,当设置为true,所有与这个交换器绑定的队列或者交换器都与此解绑,就会自动删除。

(2)创建队列

以Java客户端为例,调用exchangeDeclare方法。

queueDeclare (String queue , boolean durable , boolean exclusive,boolean autoDelete, Map<String,Object> arguments)

其中重要的参数如下:

queue,队列名称

durable,队列是·否持久化(注意,与消息是否持久化不是一回事,后面再介绍)

exclusive,是否排他(后面介绍)

autoDelete,是否自动删除,当设置为true,所有与这个队列连接的消费者都断开时,才会自动删除。

(3)绑定

交换机和队列创建好了,接下来就需要将队列绑定到交换机上,告诉交换机,什么样的消息需要投递给该交换机。

queueBind(String queue , String exchange , String routingKey)

queue,待绑定的队列名。

exchange,需要绑定到的交换机。

routingKey,用来绑定队列和交换机的路由键(有些地方为了区别发送的时候的路由键,将绑定时候的键称之为绑定键Bindkey)

如果交换机和队列已经存在,再次声明,Broker不会重新创建,直接返回成功。

2.3.  创建消息

RabbitMQ的消息分为消息头与消息体,类似http协议的消息结构。

消息头定了消息的相关属性,主要的有以下:

content-type,传输消息体的MIMEl类型,比如:application/json

content-encoding,消息体的编码,如gzip

expiration,消息的过期时间

delivery-mode,消息是否持久化到磁盘,1表示非持久化,2表示持久化。

priority,消息的优先级,数值越大表示该消息的优先级越高,优先消费。

消息体主要是用户自定的消息,可以是json,也可以是xml,通过序列化成二进制数据。

以Java客户端为例,

//创建消息头

Map<String , Object> headers = new HashMap<String , Object>() ;

headers.put( " localtion" , "here " );

headers . put( " time " , " today " );

//创建消息

byte[] messageBodyBytes = "H ello , world! ". getBytes();

2.4.  发布消息

 消息头和消息体都创建完成,接下来就是向指定的交换机发布消息,声明routingKey,告知交换机投递到那个队列上。
以Java客户端为例,

channe1 .basi cPublish(exchangeName,routingKey,mandatory,immediate,

                new AMQP.BasicProperti es.Buil der ()

                  .headers(headers)

                 .build()) ,

                 messageBodyBytes) ;

exchangeName,交换机的名称

routingKey,路由键

mandatoryimmediate,这两个参数后面再介绍。

客户端会将消息封装方法帧,消息头帧,消息体帧。

 

每个消息体帧最大是131Kb,如果超过需要分割成多个。所以一个完整消息,至少需要三个帧(方法帧,消息头帧,消息体帧)。

 

2.5.  消息响应

当生产者将消息发送到交换机后,交互机是否将消息正确路由,并存入队列呢?对于可靠性要求高的系统,这个信息对于生产者很重要。RabbitMQ提供了两种方式,事务和生产确认机制(pulisher comfirm)。这里我们介绍下生产者确认机制,事务放到后面介绍。

     生产者将信道设置成comfirm模式,所有通过该信道发送的消息都会被分配一个唯一的ID,一旦消息被投送到正确的队列后,将会返回确认信息(Basic.Ack),生产者接受信息后确认消息已发布成功;当然,如果投送失败,会返回Basic.Nack信息,生产者根据业务场景判断是否需要重发。

   

确认信息是通过异步回调返回,不会阻塞生产者发送其他消息,这种模式性能较高,但是在重试的情况下,无法确保消息的顺序写入。     

3.   消息路由与存储

3.1.  消息路由

消息发送到节点的交换机后,交换机需要通过路由键投递到对应的队列上,前面介绍过交换机有四种路由类型,分别为direct,fanout,topic,headers。

(1)direct

任何绑定在交换机的队列,只要它的路由键和发布消息的路邮建一致,就能收到消息。一般用于需要将一个消息投递到一个或者多个确定的目标队列。

 

(2)fanout

所有发往fanout交换机的消息被投递到所有绑定到该交换机的队列中,这个应用于"广播"模式,该模式下,路由键不起作用。

 

(3)topic

     采用句点分隔的形式,队列可以通过使用基于通配符(*和#)的模式匹配的方式来绑定到路由键 上,发送的消息携带的路由键匹配上就会投递到该队列。

 

 

 4headers

在队列绑定时,定义匹配headers的参数的值,并设置x-match参数。发布消息时,定义header的属性值,当x-match设置为all表示所有的消息的header属性值都匹配才能路由到队列,设置为any表示,任何一个header的属性值匹配就可以路由到队列。

headers类型的灵活性以及效率都要比其他的类型差,所以用的比较少。

3.2.  消息存储

队列其实还是个逻辑概念,它包含rabbit_amqqueue_process(队列进程)和backing_queue(维护5个状态栈)。通过rabbit_amqqueue_process负责接受生产者发布的消息,向消费者交付消息,处理消息的确认等;backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口。

delivery-mode参数,1表示非持久化,2表示持久化。非持久化的消息置于内存,当内存达到设定阀值(vm memory high watermark paging ratio ),逐步写入磁盘;持久化消息则直接写入磁盘。磁盘的写入速度要远小于内存(都是顺序写入的前提下),并且需要等待落盘后才返回生产者确认信息,所以持久化模式的吞吐量要低于非持久化模式,但是可靠性强。

对于非持久化需要注意一点,如果大量的消息堆积内存,来不及进行落盘释放内存,一旦达到内存的告警阀值(vffi memory high watermark),就会产生告警,为确保节点的可用性,会阻塞所有生产者的连接,直到内存恢复到正常状态,这种情况下,会严重影响吞吐的。

持久化层分为两部分,队列索引(rabbit_queue_index)和消息存储(rabbit_msg_store)。队列索引负责维护队列中落盘消息的信息,包 括消息的存储地点、是否己被交付给消费者、是否己被消费者 ack 等,每个队列都有一个对应的队列索引,以".idx"为文件后缀。消息存储以键值队的形式存储消息,被所有的队列共享,每个节点有且只有一个,以".rdq"为文件后缀,顺序写入。

除了消息可以设置持久化,交换机和队列也可以设置持久化,三者需要同时设置,才能确保真正的持久化。如果队列没有设置持久化,消息是持久化的,那么就会导致队列删除了,但是消息还在,无法消费。

     创建消息的时候,可以通过消息头的expiration设置消息的过期时间,如果消息到达了过期时间,没有被消息,将会被投递到死信队列(见后面章节)

3.3.  消息删除

消息存储时,会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。消息被正确的消费后,即会被删除,首先删除该表的记录,但是不会立即删除文件,而是仅标记待删除数据,待一个文件都是垃圾数据时可以将这个文件删除。

4.   消息消费

4.1.  消费模式

      与生产者一样,消费者需要与节点建立连接,打开信道,实现通讯。消费者需要订阅队列进行消费,消费的模式分为推(push)和拉(pull)两种.

   (1)推模式

  这个模式RabbitMQ推荐的,队列会将消息推送给消费者,流程如下:

 

为了提高消费速度,消费者可以评估自身的消费能力,预设Qos,也就是设置一次可以获取消息的个数(类似TCP的滑块窗口),队列批量推送消息到消费端,消费者获取消息后恢复确认信息。

(2)拉模式

 

消费者每次去队列拉消息,每次只能拉一条。

 

如果只是想获取单条信息,而不是持续订阅,可以采用拉模式。但是不建议使用循环来替代推模式,这样会严重影响RabbitMQ的性能。

4.2.  消息确认和拒绝

消费者获取消息后,可以设置不回复确认,自动回复确认,以及手动回复确认三种模式。

1、不回复确认,队列会默认发出去的消息都会被正确的处理,无需等待确认,这个方式吞吐最高,但是可靠性性最差。

2、自动回复确认,有可能消息没有正确处理,也被回复,队列无法识别,导致消息丢失。

3、手动回复确认,可以根据业务处理的逻辑,把握回复的时机,会带来一定的业务复杂度,但是可靠性最好。

对于设置Qos,可以进行批量回复,无需单次回复,这个也是提高吞吐的重要手段。

    消息者也可以拒绝消息,比如消息解析错误等情况,通过设置requeue,来告诉队列是否需要重新投递还是丢弃该消息(如果配置了死信队列,那么丢失的消息会进入死信队列,等待处理)。

  如果选择了重新投递,那么消费的顺序是无法得到保证的。

4.3.  队列处理

 

为了提升消费的速度,生产发布的消息到达队列后,如果队列为空,且有消息者等待消息,则直接发送给消费者,异步放入内存或者磁盘,提升消费速度。这种标记已投递的方式与Kafka的标记offset比较,逻辑上处理简单了,确保了消息不会被重复消费,但是也导致消息无法回溯。队列创建时,可以设置exclusive属性,如果这是为true,表示排他队列,也就是当时创建队列连接(包括所有的信道)的客户端可以消费,其他的无法消息。通俗些,只有你当前这个程序(或进程)进行消费处理,不希望别的客户端读取到这个队列,一般用在RPC模式。一旦连接中断,排他队列也将删除,无论是否设置为持久化队列。

5.   集群

单点无法确保高可用,同时一台I/O能力有限,无法满足高吞吐,在企业级应用中,一般都会部署集群,提供服务。

5.1.  集群拓扑

 

集群节点间呈网状连接,节点间相互通讯,每个节点上保留所有的元数据,包括:

1、交换机,交换机的名称和属性。

2、队列元数据,队列的的名称和属性。

3、绑定关系,交换机与交换机以及交换机与队列的绑定关系元数据。

4、vhost,vhost内的队列,交换机和绑定的命名空间以及安全属性。

所有节点全量保留元数据,会有以下的影响:

1、当客户端连接某个节点创建这些元数据的时候(比如队列,交换机),需要同步到所有节点上,并等待所有的节点的完成后,才答复成功,所以会有一定的延迟。

2、客户端连接其中某个节点,都能获取到所有的元数据(这点和Kafka类似)。

消息内容会不会所有节点都备份呢?答案是否。因为这会造成大量的空间浪费(镜像队列除外,后面介绍)。如果一个生产者(消费者)连接到某个节点发布(消费)队列消息,但该队列不在该节点上,那需要通过该节点路由。如下:

 

     消息的发布和消费链路加长,会导致延迟加长,吞吐量降低,为了减少影响,集群的节点建议部署到在一个汇聚下,不要跨可用区部署(后面会介绍跨可用区的方法)。我们希望客户端对接的节点上部署所需要的队列,这个需要规划得当。

 

5.2.  负载均衡

在多节点情况下,客户端的请求通过负载均衡,将流量均匀分摊到各节点,RabbitMQ集群可以通过HAProxy,LVS+keepalived等LB实现,如下图所示。

5.3.  镜像队列

集群多节点能确保整体服务的可用性,但是对于单个队列来说,如果做不了多节点部署,还是有单节点故障的可能,ActiveMQ中采用主从模式保证了高可用,在RabbitMQ中,也有类似机制,称之为镜像队列(或者HA队列)。

(1)创建镜像队列

在创建队列时,通过ha-mode,ha-params,ha-sync-mode来定义镜像队列个数,分布,以及同步模式。

ha-mode,有效值为,all,exactly,nodes。alls表示在所有的节点上创建镜像队列,exactly表示指定个数的节点上创建,ha-params设置个数;nodes指定在哪些节点上创建,ha-params指定节点名。

ha-sync-mode,有效值为automatic,manual。automatic表示新节点加入时,默认自动同步镜像队列消息;manual表示新节点加入时,不会自动同步镜像队列消息。因为同步操作会导致队列的阻塞,建议使用manual模式。

镜像队列有一个master和多个slave组成,创建完成后,直接连接的节点上的队列为master,其他的为slave。

(2)发布消息

 

可以认为有个隐藏的fanout交换机,向所有的镜像队列进行广播(这里与Kafka不大一样,它是通过follower向master请求同步内容)。当所有的镜像队列确认完成后,才向发布者回复publish-comfirm(所以镜像个数不能太多,否则影响发布吞吐量,一般2-3个为宜),这样能确保所有的镜像队列的消息都是同步的。

(3)消费

除了发布需要向master和slave同时投递消息,其他的都是由master负责向slave同步,包括消费,ack等,如果消息者连接master队列所在的节点(如消费者2),则消息队列信息即可;如果消费者连接的是slave队列所在的节点(如消费者1),slave节点需要将消费指令发送给master节点,master节点将数据准备好,发送给到slave节点,再投递给消费者。slave队列为何不类似于mysql提供读服务呢,这个和Kafka的设计类似,RabbitMQ的负载粒度在队列上,而不是整个节点,只需要将master队列均衡分布,就是平衡整个节点的压力。

 

如图,节点1,节点2,节点3分别分担队列1,队列2,队列3的压力。

(4)失效转移

当slave所在节点挂掉后,除了与slave相连的客户端全部断开连接,其他的没有影响。但是当master所在的节点宕机后,就会产生连锁反应。

1)与master节点所有的客户端连接断开。

2)选举最老的slave节点为master节点,因为最老的slave与master之间的同步状态是最好的,但是也存在未同步的信息丢失。

3)新的master节点重续入队所有unack信息。消费者获取信息进行消费,还没来得及ack,或者ack信息没有同步到新的master上,新master无法确认这部分消息是否被正确消费,为了安全起见,所有的unack都重新入队,此时客户端会有重复消费的可能。

4)如果消费端直连master所在节点(如上图中的消费者2),master节点宕机后,TCP连接断开,重入附加并监听新的master节点;如果是连接slave所在节点(如上图中的消费者1),就无法感知master节点宕机了,只认为队列没有消息。此时,在basicComsume消费时需要指定x-cancel-on-ha-failover参数,监听master节点断开的通知,接受到通知后重入附加并监听新的master。这点非常重要,否则会导致消息大量积压,但是消息端又无消息消费的问题。这个问题在Kafka中是不存在的,分区选举的结果保存在所有的节点上,客户端通过元数据更新,获取最新分区leader信息。

6.   跨集群

现在大型的网站系统,为了实现异地容灾,一般采用多机房或者混合云部署,以下分析单个集群在这种场景下的集中方案

1仅部署一个机房

 

单个集群仅部署在北京机房,南京机房的发布者和消费者跨机房访问。这个方案有以下几个问题。

(1)无法做到容灾,一旦北京机房挂了,整个集群不可用。

(2)降低吞吐量,南京机房的客户端发送请求后,会阻塞住,直到节点回复确认,由于两个机房的存在延迟(假设北京到南京机房的RTT在30ms),导致每次请求时间增加,降低了吞吐量。

2、延展机房部署

 

单个集群跨机房部署,节点1,节点2部署到北京机房,节点3部署到南京机房。这个方案通过镜像队列在节点1,节点2与节点3间互备,能做到部分的容灾。但是也有如下问题:

(1)跨机房请求,部分消费者和发布者还是需要跨机房请求,与方案1类似,导致请求时间增加,降低了吞吐量。

(2)脑裂,异地间的网络环境复杂,网络波动会导致分区,进而"脑裂",单个集群是无法做到跨可用区的。

单个集群是无法满足跨机房场景,需要采用多集群部署解决这个问题,跨集群间的"桥接"可以通过Federation,Shovel两种方式。

6.1.  Federation(联邦)

Federation,可以翻译为"联邦"。Federation 可以通过AMQP 协议让原本发送到某个Broker(或集群)中的交换器(或队列)上的消息能够转发到另一个Broker(或集群)中的交换器(或队列)上,两方的交换器(或队列〉看起来是以一种"联邦"的形式在运作。又分为联邦交换机和联邦队列两种模式。

1、联邦交换机

 

假设发布者1位于北京机房,需要发布消息到南京机房的Broker2节点上,首先在Broker2节点的ExchangeA上建立到Broker1节点的federation link,Broker1上会建立一个同名的交互机ExchangeA,同时建立一个内部的交换机ExchangeA->Broker2,   并通过路由键"rkA"将这两个交换器绑定,同时,还会创建一个Exchange->Broker2队列,并与Exchange->Broker2交换机绑定,Federation插件会在Broker1的ExchangeA->Broker2队列与Broker2的ExchangeA建立AMQP连接,实时消费Exchange->Broker2队列的消息。发布者1仅需要把消息发送了ExchangeA上,保存到队列ExchangeA->Broker2上即可,剩下的时就交由Federation插件搞定。

2、联邦队列

联邦交换机是在两个节点的交换机间建立连接,联邦队列就是在两个队列间建立连接

 

联邦队列可以看做互为扩展队列,如图中的Queue1与Queue3建立联邦队列,当Queue3有消息堆积,消费者1优先将Queue3的消费完,此时则会从Queue1拉取消息;反之亦然,如果Queue1的队列消费完成,将会从Queue3中拉取消息,消费者1继续消息,所以Queue1与Queue3间的消息是可以"漂移"的。联邦队列会让消费能力强的一侧多消费些,确保队列间的平衡。联邦交换机的消息流向是单向的,联邦队列消息流向是双向。

6.2.  Shovel

 

以Broker1的Queue为源,Broker2的Exchange为目标,建立shovel link,发布者发送的Msg1消息,存入queue,Shovel消费后存入Broker2的queue。当Broker1的queue产生消息堆积时,通过shovel link转移消息到其他集群进行消息,减少broke1的压力。

7.   可靠性

可靠性是衡量消息组件的重要因素,但可靠性都是相对的,没有任何组件确保百分百可靠。我们看下RabbitMQ有哪些措施保证高可靠,将整个消息的生命周期分为四个阶段来逐一分析

 

7.1.  第一阶段:消息投递

前面介绍了生产者确认的机制,通过对信道设置comfirm,对于每条投递的消息,都会返回确认信息。除此之外,生产者还可以通过事务机制确保消息投递的可靠性。

  • 事务机制

生产者通过二阶段提交方式,确报批量消息的事务一致性。

 

1、客户端发送Tx.select,将信道置为事务模式。

2、Broker回复Tx.selectOk,确认已将信道置为事务模式。

3、客户端发送批量消息。

4、根据结果(捕获异常),客户端确定是否提交还是回滚。

5、Broker确认提交或者回滚指令。

事务机制能确保批量消息投递的一致性,由于其同步操作,导致性能大大下降。事务与生产者确认机制是互斥的,一般情况下我们采用轻量级的生产者确认机制

7.2.  第二阶段:消息路由

1、mandatory

当消息发送到交换机,有可能由于路由配置错误,导致消息无法正确投递。此时,有两种处理方式,一种是直接丢弃,一种是反馈给生产者。第一章节在介绍调用publish的时候,说到mandatory参数,该参数如果设置为true,会回调生产端的监听接口,返回 消息;如果设置为false,则直接丢弃。显然设置为true,将提升可靠性。

 

2、备份交换机

保留路由错误的消息,可以设置备份交换机,路由失败的消息会丢到备份交换机,保存到特定的队列。通过该队列监控路由错误的消息。

 

7.3.  第三阶段:消息存储

消息存储阶段,介绍了以下几种可靠性机制

1、持久化,队列和消息设置为持久化,写入到磁盘,确保Broker宕机重启后,消息不丢失。但是消息先写入页面缓存,再批量写入磁盘,如果在这期间宕机,还是会存在丢失风险。

2、镜像队列,采用msater-slave多副本机制,最大程度确保可用性。

3、死信队列

死信队列是一种特殊队列,存储死信消息,有以下几种情况下的消息会变成死信消息。任何队列都可以设置一个死信交换机,将符合条件的消息路由到死信队列。死信队列也是一个普通队列,里面的消息也可被订阅。监控死信队列,及时处理死信消息,确保消息的可靠性。

1)消息被拒绝(Basic.reject,并设置request为false)

2)消息过期(超过expiration时间)

3)队列达到最大长度。

7.4.  第四阶段:消息消费

消费者获取消息后,通过手动确认,能最大程度消息消费的可靠性,也可以拒绝消息,并设置request为true,该消息会重新入队。

8.   高吞吐

高可用,高吞吐在有些场景中是互斥的,所以需要根据自身的业务进行抉择,以下分析提高吞吐率的一些措施。

1、生产者不设置信道确认,能大幅提高吞吐量,但是可靠性较差,可以用在允许部分消息丢失的场景,比如统计用户点击量。

2、按照业务规则将队列拆分多个,分布到不同的节点上。

3、一个节点上设置合理的队列个数,一个队列对应一个线程,在一个多核的节点,使用多个队列与消费者可以获得更好的吞吐量,将队列数量设置为等于服务器cpu核数将获得最佳吞吐量。

4、限制队列的大小,设置TTL或者max-length限制队列大小。

5、自动删除不需要的队列,设置队列的TTL,配置auto-delete以及排它队列,删除无用的队列。

6、设置尽可能大的预取个数,不过这个是需要综合网络带宽,客户端缓存,消息处理速度等各方面的因素评估。

 

0条评论
0 / 1000
莫****过
6文章数
0粉丝数
莫****过
6 文章 | 0 粉丝
莫****过
6文章数
0粉丝数
莫****过
6 文章 | 0 粉丝
原创

消息选型-Rabbitmq篇

2023-09-21 10:46:56
14
0

 

1.   架构

我们先来看下整体架构。

 

Producer,消息生产者,即消息的发布方, 生产者生产的消息,首先发布到指定的交换机上,交换机通过路由键(RoutingKey)的匹配,选择对应的队列进行投递。消息者订阅队列,消费队列的消息。

Connectiton,客户端与Broker间的TCP连接。

Channel,信道,每个连接采用多路复用,包含多个信道。producer与Broker间采用信道传递数据。

Broker,Rabbit服务节点。

Vhost,虚机机,一个节点下包含多个vhost,vhost间的exchange,queue相互隔离。就好比一台物理机上(Broker)部署多台虚机(vhost),虚拟机采用不同的用户名密码登录,实现多租户。

Exchange,交换机,消息首先会传递到交换机,由交换机匹配路由键(RoutingKey)决定投递到哪个queue。类比邮政局。

Queue,队列,存储消息的数据结构。类比小区的快递柜。

Binding,绑定,交换机与队列间通过路由键(RoutingKey)进行绑定起来。RoutingKey类比目的地址。

Consumer,消费者,即消息的接受方。

2.   消息发布

生产者创建和启动信道连接,声明交换机机,队列,路由键等信息,消息封装成帧,通过信道发送到Broker。

 

2.1.  创建连接和信道

 一个应用程序(生产者)与RabbitMQ服务节点,维护一个TCP连接(Connect),在这个连接中可以创建多个信道用于信息的传递,各个信道间相互隔离。这样做的目的就是避免在多线程的情况下,频繁开启和关闭多个TCP连接,带来的性能消耗(每个TCP经历三次握手,四次挥手),实际就是TCP的多路复用,类似HTTP2.0的原理。

Connection connection = factory.newConnection() ; //创建连接
Channel channel = connection.createChannel() ; //创建信道

生产者与Broker之间完成了8次交互,分别为Connect的启动,调整,打开以及Channel的开启,最终打开信道。

 

后续所有与Broker通讯都是基于该信道,从代码层面看,都是基于channel对象操作。

2.2.  创建交换机,队列

 

生产者将消息发送到交换机,交换机根据路由键,投递到对应的目的队列保存。这个过程和邮件信息非常类似,寄件人将信件送到邮政局,邮政局根据信件的地址投递到目的地的邮箱中。

 RabbitMQ提供了通过管理平台或者命名行预先创建交换机和队列。采用预先定义还是客户端创建,就看业务的需要,比如我们对交互机以及队列预先做了充分的规划,那就采用预先创建;如果需要创建一些临时的交换机或者队列,客户端创建更适合。

要想创建一个可用的交互机和队列,需要完成以下步骤

(1)创建交换机

以Java客户端为例,调用exchangeDeclare方法。

exchangeDeclare(String exchange ,String type , boolean durable ,boolean autoDelete , boolean internal ,Map<String, Object> arguments)

该方法有很多重载的方法,我们不一一介绍,只介绍下几个重要的参数

exchange,定义该交换器的名称

type,定义该交换器的路由类型,分类fanout,direct,topic,headers等(后面我们重点介绍)

autoDelete,是否自动删除,当设置为true,所有与这个交换器绑定的队列或者交换器都与此解绑,就会自动删除。

(2)创建队列

以Java客户端为例,调用exchangeDeclare方法。

queueDeclare (String queue , boolean durable , boolean exclusive,boolean autoDelete, Map<String,Object> arguments)

其中重要的参数如下:

queue,队列名称

durable,队列是·否持久化(注意,与消息是否持久化不是一回事,后面再介绍)

exclusive,是否排他(后面介绍)

autoDelete,是否自动删除,当设置为true,所有与这个队列连接的消费者都断开时,才会自动删除。

(3)绑定

交换机和队列创建好了,接下来就需要将队列绑定到交换机上,告诉交换机,什么样的消息需要投递给该交换机。

queueBind(String queue , String exchange , String routingKey)

queue,待绑定的队列名。

exchange,需要绑定到的交换机。

routingKey,用来绑定队列和交换机的路由键(有些地方为了区别发送的时候的路由键,将绑定时候的键称之为绑定键Bindkey)

如果交换机和队列已经存在,再次声明,Broker不会重新创建,直接返回成功。

2.3.  创建消息

RabbitMQ的消息分为消息头与消息体,类似http协议的消息结构。

消息头定了消息的相关属性,主要的有以下:

content-type,传输消息体的MIMEl类型,比如:application/json

content-encoding,消息体的编码,如gzip

expiration,消息的过期时间

delivery-mode,消息是否持久化到磁盘,1表示非持久化,2表示持久化。

priority,消息的优先级,数值越大表示该消息的优先级越高,优先消费。

消息体主要是用户自定的消息,可以是json,也可以是xml,通过序列化成二进制数据。

以Java客户端为例,

//创建消息头

Map<String , Object> headers = new HashMap<String , Object>() ;

headers.put( " localtion" , "here " );

headers . put( " time " , " today " );

//创建消息

byte[] messageBodyBytes = "H ello , world! ". getBytes();

2.4.  发布消息

 消息头和消息体都创建完成,接下来就是向指定的交换机发布消息,声明routingKey,告知交换机投递到那个队列上。
以Java客户端为例,

channe1 .basi cPublish(exchangeName,routingKey,mandatory,immediate,

                new AMQP.BasicProperti es.Buil der ()

                  .headers(headers)

                 .build()) ,

                 messageBodyBytes) ;

exchangeName,交换机的名称

routingKey,路由键

mandatoryimmediate,这两个参数后面再介绍。

客户端会将消息封装方法帧,消息头帧,消息体帧。

 

每个消息体帧最大是131Kb,如果超过需要分割成多个。所以一个完整消息,至少需要三个帧(方法帧,消息头帧,消息体帧)。

 

2.5.  消息响应

当生产者将消息发送到交换机后,交互机是否将消息正确路由,并存入队列呢?对于可靠性要求高的系统,这个信息对于生产者很重要。RabbitMQ提供了两种方式,事务和生产确认机制(pulisher comfirm)。这里我们介绍下生产者确认机制,事务放到后面介绍。

     生产者将信道设置成comfirm模式,所有通过该信道发送的消息都会被分配一个唯一的ID,一旦消息被投送到正确的队列后,将会返回确认信息(Basic.Ack),生产者接受信息后确认消息已发布成功;当然,如果投送失败,会返回Basic.Nack信息,生产者根据业务场景判断是否需要重发。

   

确认信息是通过异步回调返回,不会阻塞生产者发送其他消息,这种模式性能较高,但是在重试的情况下,无法确保消息的顺序写入。     

3.   消息路由与存储

3.1.  消息路由

消息发送到节点的交换机后,交换机需要通过路由键投递到对应的队列上,前面介绍过交换机有四种路由类型,分别为direct,fanout,topic,headers。

(1)direct

任何绑定在交换机的队列,只要它的路由键和发布消息的路邮建一致,就能收到消息。一般用于需要将一个消息投递到一个或者多个确定的目标队列。

 

(2)fanout

所有发往fanout交换机的消息被投递到所有绑定到该交换机的队列中,这个应用于"广播"模式,该模式下,路由键不起作用。

 

(3)topic

     采用句点分隔的形式,队列可以通过使用基于通配符(*和#)的模式匹配的方式来绑定到路由键 上,发送的消息携带的路由键匹配上就会投递到该队列。

 

 

 4headers

在队列绑定时,定义匹配headers的参数的值,并设置x-match参数。发布消息时,定义header的属性值,当x-match设置为all表示所有的消息的header属性值都匹配才能路由到队列,设置为any表示,任何一个header的属性值匹配就可以路由到队列。

headers类型的灵活性以及效率都要比其他的类型差,所以用的比较少。

3.2.  消息存储

队列其实还是个逻辑概念,它包含rabbit_amqqueue_process(队列进程)和backing_queue(维护5个状态栈)。通过rabbit_amqqueue_process负责接受生产者发布的消息,向消费者交付消息,处理消息的确认等;backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口。

delivery-mode参数,1表示非持久化,2表示持久化。非持久化的消息置于内存,当内存达到设定阀值(vm memory high watermark paging ratio ),逐步写入磁盘;持久化消息则直接写入磁盘。磁盘的写入速度要远小于内存(都是顺序写入的前提下),并且需要等待落盘后才返回生产者确认信息,所以持久化模式的吞吐量要低于非持久化模式,但是可靠性强。

对于非持久化需要注意一点,如果大量的消息堆积内存,来不及进行落盘释放内存,一旦达到内存的告警阀值(vffi memory high watermark),就会产生告警,为确保节点的可用性,会阻塞所有生产者的连接,直到内存恢复到正常状态,这种情况下,会严重影响吞吐的。

持久化层分为两部分,队列索引(rabbit_queue_index)和消息存储(rabbit_msg_store)。队列索引负责维护队列中落盘消息的信息,包 括消息的存储地点、是否己被交付给消费者、是否己被消费者 ack 等,每个队列都有一个对应的队列索引,以".idx"为文件后缀。消息存储以键值队的形式存储消息,被所有的队列共享,每个节点有且只有一个,以".rdq"为文件后缀,顺序写入。

除了消息可以设置持久化,交换机和队列也可以设置持久化,三者需要同时设置,才能确保真正的持久化。如果队列没有设置持久化,消息是持久化的,那么就会导致队列删除了,但是消息还在,无法消费。

     创建消息的时候,可以通过消息头的expiration设置消息的过期时间,如果消息到达了过期时间,没有被消息,将会被投递到死信队列(见后面章节)

3.3.  消息删除

消息存储时,会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。消息被正确的消费后,即会被删除,首先删除该表的记录,但是不会立即删除文件,而是仅标记待删除数据,待一个文件都是垃圾数据时可以将这个文件删除。

4.   消息消费

4.1.  消费模式

      与生产者一样,消费者需要与节点建立连接,打开信道,实现通讯。消费者需要订阅队列进行消费,消费的模式分为推(push)和拉(pull)两种.

   (1)推模式

  这个模式RabbitMQ推荐的,队列会将消息推送给消费者,流程如下:

 

为了提高消费速度,消费者可以评估自身的消费能力,预设Qos,也就是设置一次可以获取消息的个数(类似TCP的滑块窗口),队列批量推送消息到消费端,消费者获取消息后恢复确认信息。

(2)拉模式

 

消费者每次去队列拉消息,每次只能拉一条。

 

如果只是想获取单条信息,而不是持续订阅,可以采用拉模式。但是不建议使用循环来替代推模式,这样会严重影响RabbitMQ的性能。

4.2.  消息确认和拒绝

消费者获取消息后,可以设置不回复确认,自动回复确认,以及手动回复确认三种模式。

1、不回复确认,队列会默认发出去的消息都会被正确的处理,无需等待确认,这个方式吞吐最高,但是可靠性性最差。

2、自动回复确认,有可能消息没有正确处理,也被回复,队列无法识别,导致消息丢失。

3、手动回复确认,可以根据业务处理的逻辑,把握回复的时机,会带来一定的业务复杂度,但是可靠性最好。

对于设置Qos,可以进行批量回复,无需单次回复,这个也是提高吞吐的重要手段。

    消息者也可以拒绝消息,比如消息解析错误等情况,通过设置requeue,来告诉队列是否需要重新投递还是丢弃该消息(如果配置了死信队列,那么丢失的消息会进入死信队列,等待处理)。

  如果选择了重新投递,那么消费的顺序是无法得到保证的。

4.3.  队列处理

 

为了提升消费的速度,生产发布的消息到达队列后,如果队列为空,且有消息者等待消息,则直接发送给消费者,异步放入内存或者磁盘,提升消费速度。这种标记已投递的方式与Kafka的标记offset比较,逻辑上处理简单了,确保了消息不会被重复消费,但是也导致消息无法回溯。队列创建时,可以设置exclusive属性,如果这是为true,表示排他队列,也就是当时创建队列连接(包括所有的信道)的客户端可以消费,其他的无法消息。通俗些,只有你当前这个程序(或进程)进行消费处理,不希望别的客户端读取到这个队列,一般用在RPC模式。一旦连接中断,排他队列也将删除,无论是否设置为持久化队列。

5.   集群

单点无法确保高可用,同时一台I/O能力有限,无法满足高吞吐,在企业级应用中,一般都会部署集群,提供服务。

5.1.  集群拓扑

 

集群节点间呈网状连接,节点间相互通讯,每个节点上保留所有的元数据,包括:

1、交换机,交换机的名称和属性。

2、队列元数据,队列的的名称和属性。

3、绑定关系,交换机与交换机以及交换机与队列的绑定关系元数据。

4、vhost,vhost内的队列,交换机和绑定的命名空间以及安全属性。

所有节点全量保留元数据,会有以下的影响:

1、当客户端连接某个节点创建这些元数据的时候(比如队列,交换机),需要同步到所有节点上,并等待所有的节点的完成后,才答复成功,所以会有一定的延迟。

2、客户端连接其中某个节点,都能获取到所有的元数据(这点和Kafka类似)。

消息内容会不会所有节点都备份呢?答案是否。因为这会造成大量的空间浪费(镜像队列除外,后面介绍)。如果一个生产者(消费者)连接到某个节点发布(消费)队列消息,但该队列不在该节点上,那需要通过该节点路由。如下:

 

     消息的发布和消费链路加长,会导致延迟加长,吞吐量降低,为了减少影响,集群的节点建议部署到在一个汇聚下,不要跨可用区部署(后面会介绍跨可用区的方法)。我们希望客户端对接的节点上部署所需要的队列,这个需要规划得当。

 

5.2.  负载均衡

在多节点情况下,客户端的请求通过负载均衡,将流量均匀分摊到各节点,RabbitMQ集群可以通过HAProxy,LVS+keepalived等LB实现,如下图所示。

5.3.  镜像队列

集群多节点能确保整体服务的可用性,但是对于单个队列来说,如果做不了多节点部署,还是有单节点故障的可能,ActiveMQ中采用主从模式保证了高可用,在RabbitMQ中,也有类似机制,称之为镜像队列(或者HA队列)。

(1)创建镜像队列

在创建队列时,通过ha-mode,ha-params,ha-sync-mode来定义镜像队列个数,分布,以及同步模式。

ha-mode,有效值为,all,exactly,nodes。alls表示在所有的节点上创建镜像队列,exactly表示指定个数的节点上创建,ha-params设置个数;nodes指定在哪些节点上创建,ha-params指定节点名。

ha-sync-mode,有效值为automatic,manual。automatic表示新节点加入时,默认自动同步镜像队列消息;manual表示新节点加入时,不会自动同步镜像队列消息。因为同步操作会导致队列的阻塞,建议使用manual模式。

镜像队列有一个master和多个slave组成,创建完成后,直接连接的节点上的队列为master,其他的为slave。

(2)发布消息

 

可以认为有个隐藏的fanout交换机,向所有的镜像队列进行广播(这里与Kafka不大一样,它是通过follower向master请求同步内容)。当所有的镜像队列确认完成后,才向发布者回复publish-comfirm(所以镜像个数不能太多,否则影响发布吞吐量,一般2-3个为宜),这样能确保所有的镜像队列的消息都是同步的。

(3)消费

除了发布需要向master和slave同时投递消息,其他的都是由master负责向slave同步,包括消费,ack等,如果消息者连接master队列所在的节点(如消费者2),则消息队列信息即可;如果消费者连接的是slave队列所在的节点(如消费者1),slave节点需要将消费指令发送给master节点,master节点将数据准备好,发送给到slave节点,再投递给消费者。slave队列为何不类似于mysql提供读服务呢,这个和Kafka的设计类似,RabbitMQ的负载粒度在队列上,而不是整个节点,只需要将master队列均衡分布,就是平衡整个节点的压力。

 

如图,节点1,节点2,节点3分别分担队列1,队列2,队列3的压力。

(4)失效转移

当slave所在节点挂掉后,除了与slave相连的客户端全部断开连接,其他的没有影响。但是当master所在的节点宕机后,就会产生连锁反应。

1)与master节点所有的客户端连接断开。

2)选举最老的slave节点为master节点,因为最老的slave与master之间的同步状态是最好的,但是也存在未同步的信息丢失。

3)新的master节点重续入队所有unack信息。消费者获取信息进行消费,还没来得及ack,或者ack信息没有同步到新的master上,新master无法确认这部分消息是否被正确消费,为了安全起见,所有的unack都重新入队,此时客户端会有重复消费的可能。

4)如果消费端直连master所在节点(如上图中的消费者2),master节点宕机后,TCP连接断开,重入附加并监听新的master节点;如果是连接slave所在节点(如上图中的消费者1),就无法感知master节点宕机了,只认为队列没有消息。此时,在basicComsume消费时需要指定x-cancel-on-ha-failover参数,监听master节点断开的通知,接受到通知后重入附加并监听新的master。这点非常重要,否则会导致消息大量积压,但是消息端又无消息消费的问题。这个问题在Kafka中是不存在的,分区选举的结果保存在所有的节点上,客户端通过元数据更新,获取最新分区leader信息。

6.   跨集群

现在大型的网站系统,为了实现异地容灾,一般采用多机房或者混合云部署,以下分析单个集群在这种场景下的集中方案

1仅部署一个机房

 

单个集群仅部署在北京机房,南京机房的发布者和消费者跨机房访问。这个方案有以下几个问题。

(1)无法做到容灾,一旦北京机房挂了,整个集群不可用。

(2)降低吞吐量,南京机房的客户端发送请求后,会阻塞住,直到节点回复确认,由于两个机房的存在延迟(假设北京到南京机房的RTT在30ms),导致每次请求时间增加,降低了吞吐量。

2、延展机房部署

 

单个集群跨机房部署,节点1,节点2部署到北京机房,节点3部署到南京机房。这个方案通过镜像队列在节点1,节点2与节点3间互备,能做到部分的容灾。但是也有如下问题:

(1)跨机房请求,部分消费者和发布者还是需要跨机房请求,与方案1类似,导致请求时间增加,降低了吞吐量。

(2)脑裂,异地间的网络环境复杂,网络波动会导致分区,进而"脑裂",单个集群是无法做到跨可用区的。

单个集群是无法满足跨机房场景,需要采用多集群部署解决这个问题,跨集群间的"桥接"可以通过Federation,Shovel两种方式。

6.1.  Federation(联邦)

Federation,可以翻译为"联邦"。Federation 可以通过AMQP 协议让原本发送到某个Broker(或集群)中的交换器(或队列)上的消息能够转发到另一个Broker(或集群)中的交换器(或队列)上,两方的交换器(或队列〉看起来是以一种"联邦"的形式在运作。又分为联邦交换机和联邦队列两种模式。

1、联邦交换机

 

假设发布者1位于北京机房,需要发布消息到南京机房的Broker2节点上,首先在Broker2节点的ExchangeA上建立到Broker1节点的federation link,Broker1上会建立一个同名的交互机ExchangeA,同时建立一个内部的交换机ExchangeA->Broker2,   并通过路由键"rkA"将这两个交换器绑定,同时,还会创建一个Exchange->Broker2队列,并与Exchange->Broker2交换机绑定,Federation插件会在Broker1的ExchangeA->Broker2队列与Broker2的ExchangeA建立AMQP连接,实时消费Exchange->Broker2队列的消息。发布者1仅需要把消息发送了ExchangeA上,保存到队列ExchangeA->Broker2上即可,剩下的时就交由Federation插件搞定。

2、联邦队列

联邦交换机是在两个节点的交换机间建立连接,联邦队列就是在两个队列间建立连接

 

联邦队列可以看做互为扩展队列,如图中的Queue1与Queue3建立联邦队列,当Queue3有消息堆积,消费者1优先将Queue3的消费完,此时则会从Queue1拉取消息;反之亦然,如果Queue1的队列消费完成,将会从Queue3中拉取消息,消费者1继续消息,所以Queue1与Queue3间的消息是可以"漂移"的。联邦队列会让消费能力强的一侧多消费些,确保队列间的平衡。联邦交换机的消息流向是单向的,联邦队列消息流向是双向。

6.2.  Shovel

 

以Broker1的Queue为源,Broker2的Exchange为目标,建立shovel link,发布者发送的Msg1消息,存入queue,Shovel消费后存入Broker2的queue。当Broker1的queue产生消息堆积时,通过shovel link转移消息到其他集群进行消息,减少broke1的压力。

7.   可靠性

可靠性是衡量消息组件的重要因素,但可靠性都是相对的,没有任何组件确保百分百可靠。我们看下RabbitMQ有哪些措施保证高可靠,将整个消息的生命周期分为四个阶段来逐一分析

 

7.1.  第一阶段:消息投递

前面介绍了生产者确认的机制,通过对信道设置comfirm,对于每条投递的消息,都会返回确认信息。除此之外,生产者还可以通过事务机制确保消息投递的可靠性。

  • 事务机制

生产者通过二阶段提交方式,确报批量消息的事务一致性。

 

1、客户端发送Tx.select,将信道置为事务模式。

2、Broker回复Tx.selectOk,确认已将信道置为事务模式。

3、客户端发送批量消息。

4、根据结果(捕获异常),客户端确定是否提交还是回滚。

5、Broker确认提交或者回滚指令。

事务机制能确保批量消息投递的一致性,由于其同步操作,导致性能大大下降。事务与生产者确认机制是互斥的,一般情况下我们采用轻量级的生产者确认机制

7.2.  第二阶段:消息路由

1、mandatory

当消息发送到交换机,有可能由于路由配置错误,导致消息无法正确投递。此时,有两种处理方式,一种是直接丢弃,一种是反馈给生产者。第一章节在介绍调用publish的时候,说到mandatory参数,该参数如果设置为true,会回调生产端的监听接口,返回 消息;如果设置为false,则直接丢弃。显然设置为true,将提升可靠性。

 

2、备份交换机

保留路由错误的消息,可以设置备份交换机,路由失败的消息会丢到备份交换机,保存到特定的队列。通过该队列监控路由错误的消息。

 

7.3.  第三阶段:消息存储

消息存储阶段,介绍了以下几种可靠性机制

1、持久化,队列和消息设置为持久化,写入到磁盘,确保Broker宕机重启后,消息不丢失。但是消息先写入页面缓存,再批量写入磁盘,如果在这期间宕机,还是会存在丢失风险。

2、镜像队列,采用msater-slave多副本机制,最大程度确保可用性。

3、死信队列

死信队列是一种特殊队列,存储死信消息,有以下几种情况下的消息会变成死信消息。任何队列都可以设置一个死信交换机,将符合条件的消息路由到死信队列。死信队列也是一个普通队列,里面的消息也可被订阅。监控死信队列,及时处理死信消息,确保消息的可靠性。

1)消息被拒绝(Basic.reject,并设置request为false)

2)消息过期(超过expiration时间)

3)队列达到最大长度。

7.4.  第四阶段:消息消费

消费者获取消息后,通过手动确认,能最大程度消息消费的可靠性,也可以拒绝消息,并设置request为true,该消息会重新入队。

8.   高吞吐

高可用,高吞吐在有些场景中是互斥的,所以需要根据自身的业务进行抉择,以下分析提高吞吐率的一些措施。

1、生产者不设置信道确认,能大幅提高吞吐量,但是可靠性较差,可以用在允许部分消息丢失的场景,比如统计用户点击量。

2、按照业务规则将队列拆分多个,分布到不同的节点上。

3、一个节点上设置合理的队列个数,一个队列对应一个线程,在一个多核的节点,使用多个队列与消费者可以获得更好的吞吐量,将队列数量设置为等于服务器cpu核数将获得最佳吞吐量。

4、限制队列的大小,设置TTL或者max-length限制队列大小。

5、自动删除不需要的队列,设置队列的TTL,配置auto-delete以及排它队列,删除无用的队列。

6、设置尽可能大的预取个数,不过这个是需要综合网络带宽,客户端缓存,消息处理速度等各方面的因素评估。

 

文章来自个人专栏
中间件
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0