名言警句
任何先进的技术均与魔法无异
追本溯源
经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】
回溯基础知识
服务注册与发现设计模式-定时拉取模式
RocketMQ Topic路由的注册、发现采用的就是拉取模式,如下图所示。
- BrokerA和BrokerB全都想所有的NameServer进行注册相关的服务配置信息以及相关的Topic基本信息、Topic的读写队列数、操作权限等。
- Broker默认每30s向 NameServer 发送心跳包,心跳包中包含主题的路由信息(Topic的读写队列数、操作权限等)
- NameServer会通过HashMap更新 Topic 的路由信息,并记录最后一次收到 Broker的时间戳。
- NameServer 以每10s的频率清除已宕机的 Broker,NameServr 认为 Broker 宕机的依据是如果当前系统时间戳减去最后一次收到 Broker 心跳包的时间戳大于120s。
- 消息生产者以每30s的频率去拉取Topic的路由信息,即消息生产者并不会立即感知Broker服务器的新增与删除。
定时拉取模式
RocketMQ使用拉模式来实现 Topic 路由有什么缺点呢?
- Topic路由中心(Nameserver) topic 是基于最终一致性,极端情况下会出现数据不一致。
- 客户端无法实时感知路由信息的变化,例如某台Broker 自身进程未关闭,但停止向 Nameserver 发送心跳包,但生产者无法立即感知该 Broker 服务器的异常,会对消息发送造成一定的可用性?
- 因为基于上述的设计,RocketMQ NameServer 的实现非常简单、高效,至于消息发送的高可用性,则由消息发送客户端自己来保证。
RocketMQ 的设计者遵循的一个设计理念:崇尚“缺陷美”,简单、高性能。
相对的Dubbo的服务注册与发现就是典型的实时推送模式
- 优点:针对于服务注册、服务订阅、服务剔除具有较强的实时性
- 缺点:服务注册中心实现较复杂,至少需要具备发布-订阅模式。
Publish消息发布体系
发布订阅是消息中间件的最基本功能,也是相对于传统 RPC 通信而言。
RocketMQ消息发送(Publish)
生产者(Producer)就是消息的发送者,Apache RocketMQ 拥有丰富的消息类型,可以支持不同的应用场景,在不同的场景中,需要使用不同的消息进行发送。比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略,此时就需要用到延迟消息;电商场景中,业务上要求同一订单的消息保持严格顺序,此时就要用到顺序消息。在日志处理场景中,可以接受的比较大的发送延迟,但对吞吐量的要求很高,希望每秒能处理百万条日志,此时可以使用批量消息。在银行扣款的场景中,要保持上游的扣款操作和下游的短信通知保持一致,此时就要使用事务消息,下一节将会介绍各种类型消息的发送。
注意:生产环境中不同消息类型需要使用不同的主题,不要在同一个主题内使用多种消息类型,这样可以避免运维过程中的风险和错误。
消息发送流程主要包括如下三个方面:
- Topic路由寻址,根据topic,进行选择对应的broker,然后再进行选择对应的broker下映射的选择消息队列(MessageQueue)。
- 当选择完Broker以及对应的MessageQueue之后就要进行消息发送。
- 发送成功直接退出即可,当发生失败后,判断是否达到重试次数,如果是则直接退出,否则再次选择该Broker的其他队列进行重试重试。
重试发送可能存在的问题分析
上一次选择的队列为Broker A服务器的q1队列, 如果消息发送失败是由于Broker A宕机引起的, 如果本次继续轮询, 则选择的队列为Broker A服务器的q 2队列,岂不是还会发送失败,这重试的意义又何在呢?
RocketMQ给出的解决办法是-Broker规避
在发送失败时, 如果向某一个Broker发送失败后, 会设置一个规避策略,例如在接下来的5分钟内选择队列 时,发生错误的Broker中的队列将不参加队列负载, 即会跳过Broker A服务器上调度队列, 只选择Broker B服务器上的队列。
RocketMQ发送的内容
发送消息的的基本概念包括消息,Tag,Keys,队列和生产者的介绍。RocketMQ 消息构成非常简单,主要由以下几部分组成。
- topic,表示要发送的消息的主题。
- body表示消息的存储内容
- properties表示消息属性
- transactionId会在事务消息中使用。
- Tag: 不管是 RocketMQ 的 Tag 过滤还是延迟消息等都会利用 Properties 消息属性机制,这些特殊信息使用了系统保留的属性Key,设置自定义属性时需要避免和系统属性Key冲突。
- Keys: 服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。
Keys
Apache RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);
注意
RocketMQ系统保留的属性Key集合有如下,需要在使用过程中避免:
TRACE_ON、MSG_REGION、KEYS、TAGS、DELAY、RETRY_TOPIC、REAL_TOPIC、REAL_QID、TRAN_MSG、PGROUP、MIN_OFFSET、MAX_OFFSET、BUYER_ID、ORIGIN_MESSAGE_ID、TRANSFER_FLAG、CORRECTION_FLAG、MQ2_FLAG、RECONSUME_TIME、UNIQ_KEY、MAX_RECONSUME_TIMES、CONSUME_START_TIME、POP_CK、POP_CK_OFFSET、1ST_POP_TIME、TRAN_PREPARED_QUEUE_OFFSET、DUP_INFO、EXTEND_UNIQ_INFO、INSTANCE_ID、CORRELATION_ID、REPLY_TO_CLIENT、TTL、ARRIVE_TIME、PUSH_REPLY_TIME、CLUSTER、MSG_TYPE、INNER_MULTI_QUEUE_OFFSET、_BORNHOST
最后补充
一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。