分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。
图1 事务消息交互流程
事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。
收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
-
在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。
PS C:\> python Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information.
如果未安装Python,请使用以下命令安装:
pip install rocketmq-client-python
-
安装librocketmq库和rocketmq-client-python。
说明建议下载rocketmq-client-cpp-2.2.0,获取librocketmq库。
-
将librocketmq.so添加到系统动态库搜索路径。
- 查找librocketmq.so的路径。
find / -name librocketmq.so
- 将librocketmq.so添加到系统动态库搜索路径。
ln -s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig
- 查找librocketmq.so的路径。
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
发送消息
参考如下示例代码。
import time
from rocketmq.client import TransactionMQProducer, Message, TransactionStatus
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
producer_group = "${GROUP}" # 生产者组group
def transaction_checker_callback(msg, user_args):
return TransactionStatus.COMMIT
def transaction_local_execute(msg, user_args):
return TransactionStatus.UNKNOWN
producer = TransactionMQProducer(producer_group, transaction_checker_callback)
producer.set_name_server_address(endpoint)
producer.set_session_credentials(access_key, access_secret, "")
producer.start()
msg = Message(topic)
msg.set_body("Hello RocketMQ")
ret = producer.send_message_in_transaction(msg, transaction_local_execute, None)
print(ret.status, ret.msg_id, ret.offset)
while True:
time.sleep(3600)
订阅消息
参考如下示例代码。
import time
from rocketmq.client import PushConsumer, ConsumeStatus
endpoint = "${ENDPOINT}" # 填写分布式消息服务RocketMQ控制台Namesrv接入点
access_key = "${ACCESS_KEY}" # 填写AccessKey,在管理控制台创建
access_secret = "${SECRET_KEY}" # 填写SecretKey 在管理控制台创建
topic = "${TOPIC}" # 填写Topic,在管理控制台创建
group = "${GROUP}" # 填写订阅组group,在管理控制台创建
def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer(group)
consumer.set_name_server_address(endpoint)
consumer.set_session_credentials(access_key, access_secret, "")
consumer.subscribe(topic, callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()