分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到40天。
定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。
发送定时消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
适用场景
定时消息适用于以下场景:
- 消息对应的业务逻辑有时间窗口要求,如电商交易中超时未支付关闭订单的场景。在订单创建时发送一条定时消息,5分钟以后投递给消费者,消费者收到此消息后需要判断对应订单是否完成支付,如果未完成支付,则关闭订单。如果已完成,则忽略。
- 通过消息触发定时任务的场景,如在某些固定时间点向用户发送提醒消息。
注意
定时消息的最大延迟时间为40天,延迟超过40天的消息将会发送失败。
定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
定时消息的精度有1s~2s的延迟误差
无法确保定时消息仅投递一次,定时消息可能会重复投递。
定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。
设置定时消息的投递时间后,依然受消息老化时间限制,默认消息过期时间为7天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第12天被删除。
定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
准备环境
-
执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
go version go1.21.5 linux/amd64
如果未安装Go,请官网下载并安装。
-
在“go.mod”中增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 )
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
发送定时/延时消息
发送定时/延时消息的示例代码如下。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
"time"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台角色控制创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{endpoint}),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
if err := p.Start(); err != nil {
fmt.Println("start producer error:", err)
os.Exit(1)
}
// 创建消息
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ"),
}
// 设置延迟等级
// 等级与时间对应关系:
// 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// 如果想用延迟级别,那么设置下面这个方法
msg.WithDelayTimeLevel(3)
//发送任意延迟消息,则使用下面的方法,WithDelayTimeLevel就不要设置了, 时间单位为毫秒,如下所示:消息将在10s后投递
delayMills := int64(10 * 1000)
msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Println("send message error:", err)
return
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
订阅定时/延时消息
订阅定时/延时消息的示例代码如下。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
// 在控制台创建的订阅组(Group)
group := "${GROUP}"
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range messages {
// 处理消息
fmt.Printf("receive msg: %v \n", messages[i])
}
// 如果消息处理成功则返回consumer.ConsumeSuccess
// 如果消息处理失败则返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan := make(chan interface{}, 0)
<-waitChan
}