本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送、异步发送、单向发送。
- 同步发送:同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
- 异步发送:异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
- 单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发。
- 收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
-
执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
go version go1.21.5 linux/amd64
如果未安装Go,请官网下载并安装。
-
在“go.mod”中增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 )
以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- GROUP:表示消费组名称。
- ENDPOINT:表示实例连接地址和端口。
- TOPIC:表示Topic名称。
同步发送
同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。
参考如下示例代码。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台角色控制创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 4; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ! " + strconv.Itoa(i)),
}
// 为消息添加Tag
// msg.WithTag("TagA")
// 为消息添加Key
// msg.WithKeys([]string{"KeyA"})
// 使用同步方式发送消息
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
异步发送
异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
参考如下示例代码。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"sync"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台角色控制创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
// 使用异步方式发送消息
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
err := p.SendAsync(context.TODO(), func(ctx context.Context, result *primitive.SendResult, e error) {
if e != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", result.String())
}
wg.Done()
}, primitive.NewMessage(topic, []byte("Hello RocketMQ")))
if err != nil {
fmt.Printf("send message error: %s\n", err)
}
}
wg.Wait()
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
单向发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
参考如下示例代码。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台角色控制创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 4; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ! " + strconv.Itoa(i)),
}
// 使用单向方式发送消息
err := p.SendOneWay(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success")
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
订阅普通消息
参考如下示例代码。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
// 填写分布式消息服务RocketMQ控制台Namesrv接入点
endpoint := "${ENDPOINT}"
// 填写AccessKey,在管理控制台创建
accessKey := "${ACCESS_KEY}"
// 填写SecretKey 在管理控制台创建
secretKey := "${SECRET_KEY}"
// 填写Topic,在管理控制台创建
topic := "${TOPIC}"
// 在控制台创建的订阅组(Group)
group := "${GROUP}"
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range messages {
// 处理消息
fmt.Printf("receive msg: %v \n", messages[i])
}
// 如果消息处理成功则返回consumer.ConsumeSuccess
// 如果消息处理失败则返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan := make(chan interface{}, 0)
<-waitChan
}