概述
本文介绍使用 RocketMQ C++(4.x) 客户端 SDK,访问分布式消息服务RocketMQ,帮助您更好地理解消息收发的完整过程。
前置条件
安装gcc-c++ 4.8.2 及以上版本,需支持C++11。
安装cmake 2.8.0及以上版本。
安装automake 1.11.1及以上版本。
安装autoconf 2.65及以上版本。
安装libtool 2.2.6 及以上版本。
环境准备
需要在客户端环境安装 RocketMQ-Client-CPP 库,根据官方文档进行安装即可: 安装 CPP 动态库,推荐使用 master 分支构建。
在项目中引入 RocketMQ-Client-CPP 相关头文件及动态库,详见实例代码头文件。
使用g++命令获得可执行文件,如:
g++ -o xxxProducer xxxProducer.cpp -lrocketmq -lpthread -lz -ldl -lrt
使用 C++ SDK 收发普通消息
发送普通消息
#include <iostream>
#include <chrono>
#include <thread>
#include <string>
#include "rocketmq/DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main(){
DefaultMQProducer producer("group_name");
//填写分布式消息服务RocketMQ版的接入点
producer.setNamesrvAddr("your access point");
//填写分布式消息服务RocketMQ版的ak、sk
producer.setSessionCredentials("ak", "sk", "channel");
producer.start();
int count = 64;
for (int i = 0; i < count; ++i)
{
//填入主题名、tag名、消息body
MQMessage msg("topic_name", "TAG", "msg content");
try
{
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK, costs" << std::endl;
producer.shutdown();
return 0;
}
收取普通消息
#include <iostream>
#include <thread>
#include "rocketmq/DefaultMQPushConsumer.h"
using namespace rocketmq;
class ConcurrentMessageListener : public MessageListenerConcurrently
{
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs)
{
for (auto item = msgs.begin(); item != msgs.end(); item++)
{
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]){
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("consumer_group");
consumer->setNamesrvAddr("your access point");
consumer->setSessionCredentials("ak", "sk");
ConcurrentMessageListener *messageListener = new ConcurrentMessageListener();
consumer->subscribe("topic_name", "tag");
consumer->registerMessageListener(messageListener);
consumer->start();
std::this_thread::sleep_for(std::chrono::seconds(60));
consumer->shutdown();
return 0;
}
使用C++客户端收发顺序消息
简介
顺序消息分为两类,全局顺序消息和分区顺序消息,通过队列数区分。
全局顺序:
对于指定的一个 Topic,所有消息的生产和消费需要遵循一定的顺序,消息的消费顺序必须和生产顺序一致,即需要严格的先入先出 FIFO的顺序进行发布和消费。
分区顺序:
对于指定的一个 Topic,其中每一个分区的消息生产与消费是有序的,同一个队列内的消息按照严格的 FIFO 顺序进行发布和订阅。消息投递到哪一个分区由消息的 Sharding Key 来进行区分。在 SDK 中可以通过指定 Sharding Key 和回调函数来控制消息投递到哪个分区。
发送顺序消息
#include <iostream>
#include <chrono>
#include <thread>
#include "rocketmq/DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
class DefineSelectMessageQueue : public MessageQueueSelector
{
public:
MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg)
{
//若希望全局有序,请修改对应index
int orderId = *static_cast<int *>(arg);
int index = orderId % mqs.size();
return mqs[index];
}
};
int main(){
DefaultMQProducer producer("group_name");
//填写分布式消息服务RocketMQ版的接入点
producer.setNamesrvAddr("your access point");
//填写分布式消息服务RocketMQ版的ak、sk
producer.setSessionCredentials("ak", "sk", "channel");
producer.start();
DefineSelectMessageQueue *queueSelector = new DefineSelectMessageQueue();
int count = 64;
for (int i = 0; i < count; ++i)
{
MQMessage msg("you_topic_name", "TAG", "msg content");
try
{
SendResult sendResult = producer.send(msg, queueSelector, &i, 3, false);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK, costs" << std::endl;
producer.shutdown();
return 0;
}
消费顺序消息
#include <iostream>
#include <thread>
#include "rocketmq/DefaultMQPushConsumer.h"
using namespace rocketmq;
class OrderlyMessageListener : public MessageListenerOrderly
{
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs)
{
for (auto item = msgs.begin(); item != msgs.end(); item++)
{
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]){
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_group");
consumer->setNamesrvAddr("your access point");
consumer->setSessionCredentials("ak", "sk", "VOLC");
OrderlyMessageListener *messageListener = new OrderlyMessageListener();
consumer->subscribe("topic_name", "tag");
consumer->registerMessageListener(messageListener);
consumer->start();
std::this_thread::sleep_for(std::chrono::seconds(60));
consumer->shutdown(); return 0;
}
使用C++客户端收发事务消息
简介
业务侧通过 sendMessageInTransaction 发送消息到 RocketMQ 服务端。
业务侧通过 executeLocalTransaction 执行本地事务。
实现业务查询事务执行是否成功的接口 checkLocalTransaction。
使用C++客户端发送事务消息
#include <iostream>
#include <chrono>
#include <thread>
#include "rocketmq/TransactionMQProducer.h"
#include "rocketmq/MQClientException.h"
#include "rocketmq/TransactionListener.h"
using namespace std;
using namespace rocketmq;
class DefineTransactionListener : public TransactionListener
{
public:
LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg)
{
/*
执行本地事务
1. 成功返回COMMIT_MESSAGE
2. 失败返回ROLLBACK_MESSAGE
3. 不确定返回UNKNOWN。服务端会触发定时任务回查状态
*/
std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic()
<< ", transactionId:" << msg.getTransactionId() << std::endl;
return UNKNOWN;
}
LocalTransactionState checkLocalTransaction(const MQMessageExt &msg)
{
/*
回查本地事务执行情况
1. 成功返回COMMIT_MESSAGE
2. 失败返回ROLLBACK_MESSAGE
3. 不确定返回UNKNOWN。则等待下次定时任务回查。
*/
std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic()
<< ", MsgId:" << msg.getMsgId() << std::endl;
return COMMIT_MESSAGE;
}
};
int main(){
TransactionMQProducer producer("producer_group_name");
producer.setNamesrvAddr("accesspoint");
producer.setSessionCredentials("ak", "sk", "channel");
DefineTransactionListener *exampleTransactionListener = new DefineTransactionListener();
producer.setTransactionListener(exampleTransactionListener);
producer.start();
int count = 3;
for (int i = 0; i < count; ++i)
{
MQMessage msg("TRANSACTION TOPIC", "TAG", "Transaction content");
try
{
SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
std::cout << "SendResult:" << sendResult.getSendStatus()
<< ", Message ID: " << sendResult.getMsgId()
<< std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK " << endl; std::cout << "Wait for local transaction check..... " << std::endl;
for (int i = 0; i < 6; ++i)
{
this_thread::sleep_for(chrono::seconds(10));
std::cout << "Running " << i * 10 + 10 << " Seconds......" << std::endl;
}
producer.shutdown();
return 0;
}
使用C++客户端消费事务消息
和消费普通消息一致,请参考对应部分。
使用C++客户端收发延时消息
使用C++客户端发送延时消息
#include <iostream>
#include <chrono>
#include <thread>
#include <string>
#include "rocketmq/DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main(){
DefaultMQProducer producer("producer_group_name");
producer.setNamesrvAddr("accesspoint");
producer.setSessionCredentials("ak", "sk", "volc");
producer.start();
int count = 64;
for (int i = 0; i < count; ++i)
{
MQMessage msg("you topic name", "TAG", "msg content");
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(5);
try
{
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK, costs" << std::endl;
producer.shutdown();
return 0;
}
使用C++客户端消费延时消息
和消费普通消息一致,请参考对应部分。