0. 背景
一般来说,ROCKETMQ 主流使用的是JAVA和C++客户端,实际上ROCEKTMQ也有GO语言的客户端。
1.准备工作
Go:1.13 或以上
require (
github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
)
2.收发普通消息
2.1 同步发送消息
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
发送消息的示例代码:
package main
import (
"context"
"fmt"
"os"
"strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"
for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}
2.2 异步发送消息
消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
示例代码:
package main
import (
"context"
"fmt"
"os"
"sync"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
err := p.SendAsync(context.Background(),
func(ctx context.Context, result *primitive.SendResult, e error) {
if e != nil {
fmt.Printf("receive message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", result.String())
}
wg.Done()
}, primitive.NewMessage(topic, []byte("Hello RocketMQ Go Client!")))
if err != nil {
fmt.Printf("send message error: %s\n", err)
}
}
wg.Wait()
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}
2.3 单向发送消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
示例代码:
package main
import (
"context"
"fmt"
"os"
"strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"
for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
err := p.SendOneWay(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success\n")
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}
2.4 订阅消息
只推荐使用PUSH的方式:
package main
import (
"context"
"fmt"
"os"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("sub1"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := c.Subscribe("test11", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
fmt.Println("start Consumer")
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
} else {
fmt.Println("shutdown Consumer")
}
}
3.收发顺序消息
顺序消息(FIFO消息)是RocketMQ提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为两类:
- 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。为了确保这个效果,建议topic只建在一个broker上面,并且只有一个queue。
- 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念
3.1 发送顺序消息
示例代码:
package main
import (
"context"
"fmt"
"os"
"strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// manualQueueSelector use the queue manually set in the provided Message's QueueID field as the queue to send.
type manualQueueSelector struct{}
func NewManualQueueSelector() producer.QueueSelector {
return new(manualQueueSelector)
}
func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
return queues[0]
}
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
producer.WithQueueSelector(NewManualQueueSelector()),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"
for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}
3.2 订阅消息
示例代码;
package main
import (
"context"
"fmt"
"os"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("sub1"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
consumer.WithConsumerOrder(true),
)
err := c.Subscribe("test11", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
fmt.Printf("orderly context: %v\n", orderlyCtx)
fmt.Printf("subscribe orderly callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
fmt.Println("start Consumer")
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
} else {
fmt.Println("shutdown Consumer")
}
}
4. 收发事务消息
示例代码:
package main
import (
"context"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type DemoListener struct {
localTrans *sync.Map
transactionIndex int32
}
func NewDemoListener() *DemoListener {
return &DemoListener{
localTrans: new(sync.Map),
}
}
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
status := nextIndex % 3
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))
fmt.Printf("dl")
return primitive.UnknowState
}
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
v, existed := dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v, return Commit", msg)
return primitive.CommitMessageState
}
state := v.(primitive.LocalTransactionState)
switch state {
case 1:
fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
return primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
}
}
func main() {
p, _ := rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(1),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"
for i := 0; i < 10; i++ {
res, err := p.SendMessageInTransaction(context.Background(),
primitive.NewMessage(topic, []byte("Hello RocketMQ again "+strconv.Itoa(i))))
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
time.Sleep(5 * time.Minute)
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}
5. 延时消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
投递等级(delay level) |
延迟时间 |
1 |
1s |
2 |
5s |
3 |
10s |
4 |
30s |
5 |
1min |
6 |
2min |
7 |
3min |
8 |
4min |
9 |
5min |
10 |
6min |
11 |
7min |
12 |
8min |
13 |
9min |
14 |
10min |
15 |
20min |
16 |
30min |
17 |
1h |
18 |
2h |
实例代码:
package main
import (
"context"
"fmt"
"os"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"
for i := 0; i < 10; i++ {
msg := primitive.NewMessage(topic, []byte("Hello RocketMQ Go Client!"))
msg.WithDelayTimeLevel(3)
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}
6. 总结
通过GO语言可以基本使用ROCEKTMQ的SDK客户端功能。但管理类的接口并不完善, 实际使用的时候,可以结合其他管理的方法去解决。