分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。
图1 事务消息交互流程
事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。
收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
-
执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
go version go1.21.5 linux/amd64
如果未安装Go,请官网下载并安装。
-
在“go.mod”中增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 )
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
发送事务消息
参考如下示例代码。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
)
// 这里模拟了一个内存状态的事务执行,实际需要更换成相应的数据库等事务操作
type DemoListener struct {
localTrans *sync.Map
transactionIndex int32
}
func NewDemoListener() *DemoListener {
return &DemoListener{
localTrans: new(sync.Map),
}
}
// 这里是执行本地事务逻辑的方法回调
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
status := nextIndex % 3
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))
return primitive.UnknowState
}
// 查询本地事务是否执行成功,实际情况需要查询数据库当中的信息是否真正执行成功了
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
v, existed := dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v, return Commit", msg)
return primitive.CommitMessageState
}
// 这里实际对应业务从数据库中查询消息的事务状态
state := v.(primitive.LocalTransactionState)
switch state {
case 1:
fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
return primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
}
}
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台角色控制创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
p, _ := rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s\n", err.Error())
os.Exit(1)
}
for i := 0; i < 4; i++ {
msg := primitive.NewMessage(topic, []byte("Hello RocketMQ "+strconv.Itoa(i)))
res, err := p.SendMessageInTransaction(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
// 防止客户端进程退出,业务自定义处理即可
time.Sleep(5 * time.Minute)
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
注意事务消息生产者需要实现两个回调函数,其中ExecuteLocalTransaction回调函数在发送完半事务消息后被调用,即上图中的第3阶段,CheckLocalTransaction回调函数在收到回查时调用,即上图中的第6阶段。两个回调函数均可返回3种事务状态:
primitive.CommitMessageState:提交事务,允许消费者消费该消息。
primitive.RollbackMessageState:回滚事务,消息将被丢弃不允许消费。
primitive.UnknowState:无法判断状态,期待服务端向生产者再次回查该消息的状态。
订阅事务消息
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
// 在控制台创建的订阅组(Group)
group := "${GROUP}"
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range messages {
// 处理消息
fmt.Printf("receive msg: %v \n", messages[i])
}
// 如果消息处理成功则返回consumer.ConsumeSuccess
// 如果消息处理失败则返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan := make(chan interface{}, 0)
<-waitChan
}