顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者指定分区选择算法,保证需要按顺序消费的消息被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
-
执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
go version go1.21.5 linux/amd64
如果未安装Go,请官网下载并安装。
-
在“go.mod”中增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 )
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
发送顺序消息
参考如下示例代码。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台角色控制创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
// 配置使用HASH队列选择器.
producer.WithQueueSelector(producer.NewHashQueueSelector()),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 4; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ " + strconv.Itoa(i)),
}
// msg 指定分区key, 自定义即可
msg.WithShardingKey("key" + strconv.Itoa(i))
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
订阅顺序消息
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
// 在控制台创建的订阅组(Group)
group := "${GROUP}"
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
// 使用顺序消费的模式
consumer.WithConsumerOrder(true),
)
err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
fmt.Printf("orderlyCtx: %#v\n", orderlyCtx)
for i := range messages {
// 处理消息
fmt.Printf("receive msg: %v \n", messages[i])
}
// 如果消息处理成功则返回consumer.ConsumeSuccess
// 如果消息处理失败则返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan := make(chan interface{}, 0)
<-waitChan
}