立即前往

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
查看全部活动
热门活动
  • 智算采购季 热销S6云服务器2核4G限时88元/年起,部分主机可加赠对象存储组合包!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 一键部署Llama3大模型学习机 0代码一键部署,预装最新主流大模型Llama3与StableDiffusion
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 产品能力
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理

      首页 知识中心 其他 文章详情页

      【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理

      2023-07-07 07:51:05 阅读次数:404

      apache,java

      名言警句


      任何先进的技术均与魔法无异


      追本溯源

      【​​经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】​​】


      什么是消息的顺序性

      消息的顺序性指的是在消息消费时,能按照发送的顺序来消费。例如:针对于商城服务的的下单到付款的流程中,会产生三条业务消息,它们分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。这时候就必须要考虑保证消息有序,如下图所示。

      【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理


      顺序消息的分类

      顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。


      全局顺序

      对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

      • 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。

      分区顺序

      对于指定的一个 Topic,所有消息根据sharding key进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

      • 适用场景:性能要求高,以sharding key作为分区字段,同一个区块严格的按照FIFO原则进行消息发布和消费的场景。

      普通顺序消息(Normal Ordered Message)

      普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。


      严格顺序消息(Strictly Ordered Message)

      严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

      顺序消息缺陷

      • ​发送顺序消息无法利用集群 FailOver 特性
      • 消费顺序消息的并行度依赖于队列数量
      • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
      • 遇到消息失败的消息,无法跳过,当前队列消费暂停​

      顺序消息的实现

      对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如根据ShardingKey进行分区),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

      需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。

      RocketMQ的生产顺序性

      RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

      • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
      • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

      满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

      【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理

      顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。

      例如,创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。

      【RocketMQ入门到精通】— RocketMQ中级特性能力 | 解释一下顺序消息原理

      生产者顺序消息

      顺序消息的代码手动实现如下所示:

      public class Producer {
          public static void main(String[] args) throws UnsupportedEncodingException {
              try {
                  DefaultMQProducer producer = new DefaultMQProducer("test_group");
                  producer.start();
                  String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
                  for (int i = 0; i < 100; i++) {
                      int orderId = i % 10;
                      Message msg =
                          new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                              ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                      SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                          @Override
                          public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                              Integer id = (Integer) arg;
                              int index = id % mqs.size();
                              return mqs.get(index);
                          }
                      }, orderId);
      
                      System.out.printf("%s%n", sendResult);
                  }
      
                  producer.shutdown();
              } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }

      这里的区别主要是调用了​​SendResult send(Message msg, MessageQueueSelector selector, Object arg)​​方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。

      MessageQueueSelector的接口如下:

      public interface MessageQueueSelector {
          MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
      }

      其中mqs是可以发送的队列,msg是消息,arg是上述send接口中传入的Object对象,返回的是该消息需要发送到的队列。上述例子里,是以orderId作为分区分类标准,对所有队列个数取余,来对将相同orderId的消息发送到同一个队列中。

      生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

      顺序消息的一致性(系统支持)

      如果一个Broker掉线,那么此时队列总数是否会发化?如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 ​​-o​​ 参数(--order)为true,表示顺序消息:

      $ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
      create topic to 127.0.0.1:10911 success.
      TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]

      其次要保证NameServer中的配置 ​​orderMessageEnable​​ 和 ​​returnOrderTopicConfigToBroker​​ 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。

      消费组顺序消息

      顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

      import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
      import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
      import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
      import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
      import org.apache.rocketmq.common.message.MessageExt;
      import java.util.List;
      package org.apache.rocketmq.example.order2;
      import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
      import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
      import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
      import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
      import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
      import org.apache.rocketmq.common.message.MessageExt;
      import java.util.List;
      import java.util.Random;
      import java.util.concurrent.TimeUnit;
      /**
      * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
      */
      public class ConsumerInOrder {
         public static void main(String[] args) throws Exception {
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
             consumer.setNamesrvAddr("127.0.0.1:9876");
             /**
              * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
              * 如果非第一次启动,那么按照上次消费的位置继续消费
              */
             consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
             consumer.subscribe("TopicTest", "TagA || TagC || TagD");
             consumer.registerMessageListener(new MessageListenerOrderly() {
                 Random random = new Random();
                 @Override
                 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                     context.setAutoCommit(true);
                     for (MessageExt msg : msgs) {
                         // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                         System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                     }
                     try {
                         //模拟业务逻辑处理中...
                         TimeUnit.SECONDS.sleep(random.nextInt(10));
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                     return ConsumeOrderlyStatus.SUCCESS;
                 }
             });
             consumer.start();
             System.out.println("Consumer Started.");
         }
      }
      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.51cto.com/alex4dream/5849908,作者:洛神灬殇,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:精华推荐 | 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指南

      下一篇:Gradle 中的文件操作

      相关文章

      2025-04-22 09:40:08

      【ETL工具】kettle 程序报错 Javascript error: TypeError: Cannot call method “trim“ of null

      【ETL工具】kettle 程序报错 Javascript error: TypeError: Cannot call method “trim“ of null

      2025-04-22 09:40:08
      java , javascript , org
      2025-04-15 09:20:33

      初学Java,IO之文件过滤器(四十)

      初学Java,IO之文件过滤器(四十)

      2025-04-15 09:20:33
      File , java , 文件夹 , 方法
      2025-04-15 09:20:33

      初学Java,List和ListIterator(二十八)

      List代表的是一个有序的集合,每一个元素都有一个对应索引。List是Collection的子接口,所以Collection的所有方法都可以调用,另外也有它自身的方法。

      2025-04-15 09:20:33
      Arrays , java , List , Struts2
      2025-04-15 09:20:22

      初学Java,IO之使用转换流,读取键盘输入(四十三)

      初学Java,IO之使用转换流,读取键盘输入(四十三)

      2025-04-15 09:20:22
      buffer , import , java , string
      2025-04-15 09:20:07

      初学Java,IO之File用法(三十九)

      初学Java,IO之File用法(三十九)

      2025-04-15 09:20:07
      class , java
      2025-04-15 09:19:55

      初学Java,集合类的排序,查找,替换操作(三十五)

      初学Java,集合类的排序,查找,替换操作(三十五)

      2025-04-15 09:19:55
      java , list , 排序 , 替换 , 查找
      2025-04-14 09:26:51

      线性表练习之Example038-编写一个函数将链表 h2 链接到链表 h1 之后,要求链接后的链表仍然保持循环链表形式

      线性表练习之Example038-编写一个函数将链表 h2 链接到链表 h1 之后,要求链接后的链表仍然保持循环链表形式

      2025-04-14 09:26:51
      java , 数据结构
      2025-04-14 08:48:01

      使用Java消费API的一个错误消息PKIX path building failed以及解决方案

      使用Java消费API的一个错误消息PKIX path building failed以及解决方案

      2025-04-14 08:48:01
      java , Java , ssl
      2025-04-14 08:48:01

      如何处理Maven build时的error message Unable to locate the Javac Compiler in tools.jar

      如何处理Maven build时的error message Unable to locate the Javac Compiler in tools.jar

      2025-04-14 08:48:01
      jar , jar包 , java , Java
      2025-04-14 08:45:56

      Java实现一个坦克大战的小游戏【附源码】

      Java实现一个坦克大战的小游戏【附源码】

      2025-04-14 08:45:56
      html , i++ , java
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      32777

      阅读量

      4834591

      查看更多

      最新文章

      java中final的用法

      2025-04-11 07:15:54

      java使用poi解密excel文件

      2025-04-11 07:11:40

      nested exception is org.apache.ibatis.exceptions.TooManyResultsException: Expected one result

      2025-04-09 09:17:07

      使用ZipEntry解压zip文件报错: java.lang.IllegalArgumentException: MALFORMED

      2025-04-09 09:17:07

      总结java中文件拷贝剪切的5种方式-JAVA IO基础总结第五篇

      2025-04-09 09:14:24

      JAVA本地编译运行出现的找不到类名问题

      2025-03-14 09:05:42

      查看更多

      热门文章

      GC是什么? 为什么要有GC?

      2023-05-10 06:02:16

      在java正则表达式中为什么要对 . 进行两次的转义操作 (\\.)才表示真正的 .

      2023-05-15 10:01:56

      axios&spring前后端分离传参规范总结

      2023-05-22 08:09:06

      Jsp Ajax之模拟用户注册

      2022-11-17 12:37:24

      定义一个函数,接收三个参数返回一元二次方程

      2023-02-13 07:59:59

      JAVA基础加强笔记

      2022-11-14 02:56:39

      查看更多

      热门标签

      linux java python javascript 数组 前端 docker Linux vue 函数 shell git 容器 spring 节点
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      leetcode-单链表-23

      前端工作小结33-确定需求报告

      java api操作mongodb

      java.lang.NoClassDefFoundError: org/jaxen/JaxenException

      Map.Entry 类使用简介

      MDC学习笔记

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 权益商城
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 权益商城
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号