searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

分布式消息如何保证可靠性传递

2023-02-08 01:17:00
12
0

一、如何保证可靠性传递

1.1、什么是生产端的可靠性投递

①、保障消息成功发送出去

②、保障mq节点成功接收消息

③、消息发送端需要收到mq服务的确认应答

④、完善的消息补偿机制(若要保证百分百成功 需要该步骤)

 

1.2、解决保障可靠性投递的方案一:消息落库打标方案

消息入库打标解决思路 (Order_Serve 调用物流服务举例子)

在消息生产者端(也就是订单服务)

正常链路流程

第一步(该环节调用了操作了二次数据库):在创建订单操作的时候,把数据插入到订单相关的表中,并且构造调用物流模块的数据消息,把消息插入到消息表中,初始状态为0

第二步:把物流消息投递到消息队列中,

第三步:消息队列访问一个确认消息,并且由订单服务来监控mq server的确认消息

第四步:根据收到的确认消息来更新数据库中的消息记录的状态

 

 

异常链路流程

第一步(该环节调用了操作了二次数据库):在创建订单的操作的时候,把数据插入到订单相关的表中,并且构造调用物流模块的数据消息,把消息插入到消息表中,初始状态为0

第二步:把物流消息投递到消息队列中,

第三步:由于网络闪断,导致消费端监控mq服务访问的确认消息 没有收到,那么在msg_db中的那条消息的状态永远就是0状态。这个时候,我们需要对这种情况下做出补偿

补偿机制:

启动一个分布式的定时任务,不定时的去扫描msg_db的这个表,状态为0的消息记录,在这里我们可以根据业务来设置扫描重发规则:

规则1:插入msg_db 表中5min后状态还是为0的记录,进行消息重试

规则2:若重试的次数超过五次状态还是为0的话,我们就把消息状态改为2,此时我们需要人工的去确认状态为2的消息是什么原因导致没有成功的

消息入库打标的缺点:

在第一步的过程中,既插入了业务数据表,也同时插入了消息记录表,进行了二次db操作,在高并发的环境下,这个环境就会造成性能瓶颈

1.3、延时投递,做二次确认检测,回调检测

public void senderMsg(MsgTxtBo msgTxtBo) {
   log.info("发送的消息ID:{}", msgTxtBo.getOrderNo());
   CorrelationData correlationData = new CorrelationData(msgTxtBo.getMsgId() + "_" + msgTxtBo.getOrderNo());
   // 2.1、设置exchange交换机和routing_key发送消息
   rabbitTemplate.convertAndSend(MqConst.ORDER_TO_PRODUCT_EXCHANGE_NAME, MqConst.ORDER_TO_PRODUCT_ROUTING_KEY, msgTxtBo, correlationData);
}
// 3.2、设置延时消费的队列,延时30s
rabbitTemplate.convertAndSend(MqConst.ORDER_TO_PRODUCT_DELAY_EXCHANGE_NAME, MqConst.ORDER_TO_PRODUCT_DELAY_ROUTING_KEY, msgTxtBo, new MessagePostProcessor() {
   @Override
   public Message postProcessMessage(Message message) throws AmqpException {
      message.getMessageProperties().setHeader("x-delay", MqConst.DELAY_TIME);//设置延迟时间
      return message;
   }
}, correlationData);

第8步是关键,监听延时检查消息。监听延时队列,然后到msg_db库中查询是否有该记录存在,如果存在,消息消费成功;不存在,重新投递。

还是由order服务(upstream service) 物流服务(downstream servcie)来举例子

 

 

Upstream Service上游服务也就是生产端,Downstream service下游服务也就是消费端,Callback service是回调服务

1、先将业务数据进行入库,然后生产端将消息发送出去,注意一定是等数据库操作完成之后再去发送消息

2、在发送消息之后,紧接着生产端再次发送一条消息(Second Send Delay Check),即延迟消息投递检查,这里需要设置一个延迟时间,比如5分钟之后进行投递

3、消费端去监听指定队列,将收到的消息进行处理

4、处理完成之后,发送一个confirm消息,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中

5、上面的Callback service是一个单独的服务,其实它扮演了方案一的存储消息的DB角色,它通过MQ去监听下游服务发送的confirm消息,如果Callback service收到confirm消息,那么就对消息做持久化存储,即将消息持久化到DB中

6、5分钟之后延迟消息发送到MQ了,然后Callback service还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么Callback service就需要主动发起RPC通信给上游服务,告诉它延迟投递的这条消息没有找到,需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去

方案二不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务去做

方案二主要目的是为了减少数据库操作,提高并发量,在高并发场景下,最关心的不是消息100%投递成功,而是一定要保证性能,保证能抗得住这么大的并发量,所以能减少数据库的操作就尽量减少,可以异步的进行补偿

 

0条评论
0 / 1000
林宏杰
4文章数
1粉丝数
林宏杰
4 文章 | 1 粉丝
林宏杰
4文章数
1粉丝数
林宏杰
4 文章 | 1 粉丝
原创

分布式消息如何保证可靠性传递

2023-02-08 01:17:00
12
0

一、如何保证可靠性传递

1.1、什么是生产端的可靠性投递

①、保障消息成功发送出去

②、保障mq节点成功接收消息

③、消息发送端需要收到mq服务的确认应答

④、完善的消息补偿机制(若要保证百分百成功 需要该步骤)

 

1.2、解决保障可靠性投递的方案一:消息落库打标方案

消息入库打标解决思路 (Order_Serve 调用物流服务举例子)

在消息生产者端(也就是订单服务)

正常链路流程

第一步(该环节调用了操作了二次数据库):在创建订单操作的时候,把数据插入到订单相关的表中,并且构造调用物流模块的数据消息,把消息插入到消息表中,初始状态为0

第二步:把物流消息投递到消息队列中,

第三步:消息队列访问一个确认消息,并且由订单服务来监控mq server的确认消息

第四步:根据收到的确认消息来更新数据库中的消息记录的状态

 

 

异常链路流程

第一步(该环节调用了操作了二次数据库):在创建订单的操作的时候,把数据插入到订单相关的表中,并且构造调用物流模块的数据消息,把消息插入到消息表中,初始状态为0

第二步:把物流消息投递到消息队列中,

第三步:由于网络闪断,导致消费端监控mq服务访问的确认消息 没有收到,那么在msg_db中的那条消息的状态永远就是0状态。这个时候,我们需要对这种情况下做出补偿

补偿机制:

启动一个分布式的定时任务,不定时的去扫描msg_db的这个表,状态为0的消息记录,在这里我们可以根据业务来设置扫描重发规则:

规则1:插入msg_db 表中5min后状态还是为0的记录,进行消息重试

规则2:若重试的次数超过五次状态还是为0的话,我们就把消息状态改为2,此时我们需要人工的去确认状态为2的消息是什么原因导致没有成功的

消息入库打标的缺点:

在第一步的过程中,既插入了业务数据表,也同时插入了消息记录表,进行了二次db操作,在高并发的环境下,这个环境就会造成性能瓶颈

1.3、延时投递,做二次确认检测,回调检测

public void senderMsg(MsgTxtBo msgTxtBo) {
   log.info("发送的消息ID:{}", msgTxtBo.getOrderNo());
   CorrelationData correlationData = new CorrelationData(msgTxtBo.getMsgId() + "_" + msgTxtBo.getOrderNo());
   // 2.1、设置exchange交换机和routing_key发送消息
   rabbitTemplate.convertAndSend(MqConst.ORDER_TO_PRODUCT_EXCHANGE_NAME, MqConst.ORDER_TO_PRODUCT_ROUTING_KEY, msgTxtBo, correlationData);
}
// 3.2、设置延时消费的队列,延时30s
rabbitTemplate.convertAndSend(MqConst.ORDER_TO_PRODUCT_DELAY_EXCHANGE_NAME, MqConst.ORDER_TO_PRODUCT_DELAY_ROUTING_KEY, msgTxtBo, new MessagePostProcessor() {
   @Override
   public Message postProcessMessage(Message message) throws AmqpException {
      message.getMessageProperties().setHeader("x-delay", MqConst.DELAY_TIME);//设置延迟时间
      return message;
   }
}, correlationData);

第8步是关键,监听延时检查消息。监听延时队列,然后到msg_db库中查询是否有该记录存在,如果存在,消息消费成功;不存在,重新投递。

还是由order服务(upstream service) 物流服务(downstream servcie)来举例子

 

 

Upstream Service上游服务也就是生产端,Downstream service下游服务也就是消费端,Callback service是回调服务

1、先将业务数据进行入库,然后生产端将消息发送出去,注意一定是等数据库操作完成之后再去发送消息

2、在发送消息之后,紧接着生产端再次发送一条消息(Second Send Delay Check),即延迟消息投递检查,这里需要设置一个延迟时间,比如5分钟之后进行投递

3、消费端去监听指定队列,将收到的消息进行处理

4、处理完成之后,发送一个confirm消息,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中

5、上面的Callback service是一个单独的服务,其实它扮演了方案一的存储消息的DB角色,它通过MQ去监听下游服务发送的confirm消息,如果Callback service收到confirm消息,那么就对消息做持久化存储,即将消息持久化到DB中

6、5分钟之后延迟消息发送到MQ了,然后Callback service还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么Callback service就需要主动发起RPC通信给上游服务,告诉它延迟投递的这条消息没有找到,需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去

方案二不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务去做

方案二主要目的是为了减少数据库操作,提高并发量,在高并发场景下,最关心的不是消息100%投递成功,而是一定要保证性能,保证能抗得住这么大的并发量,所以能减少数据库的操作就尽量减少,可以异步的进行补偿

 

文章来自个人专栏
分布式消息
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0