顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者指定分区选择算法,保证需要按顺序消费的消息被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
-
在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。
PS C:\> python Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information.
如果未安装Python,请使用以下命令安装:
pip install rocketmq-client-python
-
安装librocketmq库和rocketmq-client-python。
说明建议下载rocketmq-client-cpp-2.2.0,获取librocketmq库。
-
将librocketmq.so添加到系统动态库搜索路径。
- 查找librocketmq.so的路径。
find / -name librocketmq.so
- 将librocketmq.so添加到系统动态库搜索路径。
ln -s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig
- 查找librocketmq.so的路径。
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
发送消息
参考如下示例代码。
from rocketmq.client import Producer, Message
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
producer_group = "${GROUP}" # 生产者组group
# 创建并启动生产者实例
producer = Producer(producer_group)
producer.set_name_server_address(endpoint)
producer.set_session_credentials(access_key, access_secret, "")
producer.start()
msg = Message(topic)
msg.set_body("Hello RocketMQ")
msg.set_keys("") # 消息key
msg.set_tags("") # 消息tag
sharding_key = "key" # 指定消息投递的sharding key
# 根据Sharding Key,发送顺序消息,
ret = producer.send_orderly_with_sharding_key(msg, sharding_key)
print(ret.status, ret.msg_id, ret.offset)
# 关闭生产者实例,释放资源
producer.shutdown()
订阅消息
参考如下示例代码。
import time
from rocketmq.client import PushConsumer, ConsumeStatus
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
group = "${GROUP}" # 填写订阅组group,在管理控制台创建
def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer(group, orderly=True) # 指定消费者为顺序消费类型
consumer.set_name_server_address(endpoint)
consumer.set_session_credentials(access_key, access_secret, "")
consumer.subscribe(topic, callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()