分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。
图1 事务消息交互流程
事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。
收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.7。
通过以下任意一种方式引入依赖:
-
使用Maven方式引入依赖
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies>
-
点击下载依赖JAR包:rocketmq-all-4.9.7-bin-release.zip
发送事务消息
参考如下示例代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerTransactionExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
// 执行本地事务和事务回查的接口
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("开始执行本地事务: " + message);
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("收到事务消息的回查请求, MsgId: " + messageExt.getMsgId());
return LocalTransactionState.COMMIT_MESSAGE;
}
};
/*
* 创建Producer,如果想开启消息轨迹,可以按照如下方式创建:
* TransactionMQProducer producer = new TransactionMQProducer(null, "YOUR GROUP ID", getAclRPCHook(), true, null);
*/
TransactionMQProducer producer = new TransactionMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制台NAMESRV接入点地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
producer.setTransactionListener(transactionListener);
producer.start();
Message msg = new Message("TopicTest",
"TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
}
}
事务消息生产者需要实现执行本地事务和事务回查的接口,其中executeLocalTransaction方法在发送完半事务消息后被调用,checkLocalTransaction方法在收到事务回查时调用,两者被调用时可返回如下三个事务状态。
- LocalTransactionState.COMMIT_MESSAGE:提交事务,允许事务消息投递到消费者。
- LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
- LocalTransactionState.UNKNOW:无法判断状态,服务端会向生产者再次回查该消息的状态。
订阅事务消息
参考如下示例代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerTransactionExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
/*
* 创建Consumer,如果想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制台NAMESRV接入点地址
consumer.setNamesrvAddr("192.168.0.1:9876");
// consumer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
/*
* 如果想要消费指定TAG的消息,可以按照如下方式订阅:* 为订阅所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}