分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到40天。
定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。
收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
-
在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。
PS C:\> python Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information.
如果未安装Python,请使用以下命令安装:
pip install rocketmq-client-python
-
安装librocketmq库和rocketmq-client-python。
说明建议下载rocketmq-client-cpp-2.2.0,获取librocketmq库。
-
将librocketmq.so添加到系统动态库搜索路径。
- 查找librocketmq.so的路径。
find / -name librocketmq.so
- 将librocketmq.so添加到系统动态库搜索路径。
ln -s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig
- 查找librocketmq.so的路径。
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
发送消息
参考如下示例代码。
import datetime
from rocketmq.client import Producer, Message
from rocketmq.exceptions import RocketMQException
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
producer_group = "${GROUP}" # 生产者组group
# 初始化生产者
producer = Producer(producer_group)
producer.set_namesrv_addr(endpoint)
producer.set_session_credentials(access_key, access_secret, "")
# 启动生产者
try:
producer.start()
except RocketMQException as e:
print('start producer error:', e)
exit(1)
msg = Message(topic)
msg.set_body("Hello RocketMQ")
delay_time = 10
# 发送任意延迟消息,时间单位为毫秒,如下所示:消息将在10s后投递
delay_timestamp = int((datetime.datetime.now() + datetime.timedelta(seconds=delay_time)).timestamp() * 1000)
msg.set_property('__STARTDELIVERTIME', str(delay_timestamp))
# 发送消息
try:
result = producer.send_sync(msg)
print('send result:', result)
except RocketMQException as e:
print('send message error:', e)
producer.shutdown()
exit(1)
# 关闭生产者实例,释放资源
producer.shutdown()
订阅消息
参考如下示例代码。
import time
from rocketmq.client import PushConsumer, ConsumeStatus
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
group = "${GROUP}" # 填写订阅组group,在管理控制台创建
def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer(group)
consumer.set_name_server_address(endpoint)
consumer.set_session_credentials(access_key, access_secret, "")
consumer.subscribe(topic, callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()