本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送、异步发送、单向发送。
- 同步发送:同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
- 异步发送:异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
- 单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发。
- 收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
-
在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。
PS C:\> python Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information.
如果未安装Python,请使用以下命令安装:
pip install rocketmq-client-python
-
安装librocketmq库和rocketmq-client-python。
说明建议下载rocketmq-client-cpp-2.2.0,获取librocketmq库。
-
将librocketmq.so添加到系统动态库搜索路径。
- 查找librocketmq.so的路径。
find / -name librocketmq.so
- 将librocketmq.so添加到系统动态库搜索路径。
ln -s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig
- 查找librocketmq.so的路径。
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
发送消息
参考如下示例代码。
from rocketmq.client import Producer, Message
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
producer_group = "${GROUP}" # 生产者组group
# 创建并启动生产者实例
producer = Producer(producer_group)
producer.set_name_server_address(endpoint)
producer.set_session_credentials(access_key, access_secret, "")
producer.start()
msg = Message(topic)
msg.set_body("Hello RocketMQ")
msg.set_keys("") # 消息key
msg.set_tags("") # 消息tag
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 关闭生产者实例,释放资源
producer.shutdown()
订阅消息
参考如下示例代码。
import time
from rocketmq.client import PushConsumer, ConsumeStatus
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
group = "${GROUP}" # 填写订阅组group,在管理控制台创建
def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer(group)
consumer.set_name_server_address(endpoint)
consumer.set_session_credentials(access_key, access_secret, "")
consumer.subscribe(topic, callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()