背景信息
RocketMQ的生产消费验证是指在使用RocketMQ进行消息生产和消费时的验证过程。具体而言,验证包括以下几个方面:
- 生产者验证:RocketMQ提供了丰富的生产者API,开发人员可以使用这些API将消息发送到RocketMQ的消息队列中。在验证阶段,可以通过发送消息并检查返回结果来确保消息成功发送到Broker节点。此外,生产者还应该验证消息的顺序性、事务性以及可靠性等方面。
- 消费者验证:RocketMQ的消费者可以订阅特定的消息主题,从而消费这些主题下的消息。在验证阶段,消费者应该能够正确地从Broker节点拉取消息并进行消费处理。消费者还可以验证消息的顺序性、重试机制以及消息过滤等功能。
操作步骤
1、 天翼云官网点击控制中心,选择产品分布式消息服务RocketMQ。
2、 登录分布式消息服务RocketMQ控制台,点击右上角地域选择对应资源池。
进入实例列表,点击【管理】按钮进入管理菜单。
3、 进入实例列表,点击【管理】按钮进入管理菜单。
4、 进入主题管理菜单,点击【拨测】按钮,进行生产消费的拨测验证,验证开通的消息实例和主题。
1)生产测试拨测:
- 选择消息类型,默认普通消息。
- 填写需要产生的测试消息数量,以及每条消息的大小,默认每条消息1KB,建议不超过4MB(4096KB)。
- 选择已建的消息主题,若无选项,请新增主题,详见上文创建主题和订阅组。
- 点击【测试】按钮,按照已填写规格及数量产生测试消息数据,展示消息数据的信息,包括消息ID(messageID)、发送状态、主题名(topic名)、Broker名、队列ID。
拨测功能涉及消息发送状态码,以下是RocketMQ消息发送状态码及其说明:
✧ SEND_OK(发送成功):表示消息成功发送到了消息服务器。
✧ FLUSH_DISK_TIMEOUT(刷新磁盘超时):表示消息已经成功发送到消息服务器,但是刷新到磁盘上超时。这可能会导致消息服务器在宕机后,尚未持久化到磁盘上的数据丢失。
✧ FLUSH_SLAVE_TIMEOUT(刷新从服务器超时):表示消息已经成功发送到消息服务器,但是刷新到从服务器上超时。这可能会导致主从同步不一致。
✧ SLAVE_NOT_AVAILABLE(从服务器不可用):表示消息已经成功发送到消息服务器,但是从服务器不可用。这可能是由于网络故障或从服务器宕机引起的。
✧ UNKNOWN_ERROR(未知错误):表示发送消息时遇到了未知的错误。一般情况下建议重试发送消息。
✧ MESSAGE_SIZE_EXCEEDED(消息大小超过限制):表示消息的大小超过了消息服务器的限制。需要检查消息的大小是否合适。
✧ PRODUCE_THROTTLE(消息生产被限流):表示消息生产者的频率超出了消息服务器的限制。这可能是由于消息发送频率过高引起的。
✧ SERVICE_NOT_AVAILABLE(服务不可用):表示消息服务器不可用。这可能是由于网络故障或者消息服务器宕机引起的。
请注意,以上状态码仅适用于RocketMQ消息发送阶段,并且并不代表消息是否成功被消费者接收。同时,这些状态码也可能因版本变化而有所不同,建议查阅官方文档获取最新信息。
2)消费测试拨测:
- 选择消息顺序,下拉选择无序/有序,默认选项为无序。
RocketMQ是一种开源的分布式消息中间件,它支持有序消息和无序消息。
✧ 有序消息是指消息的消费顺序与发送顺序完全一致。在某些业务场景下,消息的处理需要保证顺序性,例如订单的处理或者任务的执行。RocketMQ提供了有序消息的支持,通过指定消息的顺序属性或使用消息队列的分区机制,可以确保消息按照指定的顺序进行消费。
✧ 无序消息则是指消息的消费顺序与发送顺序无关。无序消息的特点是高吞吐量和低延迟,适用于一些不要求严格顺序的业务场景,如日志收集等。
在RocketMQ中,有序消息和无序消息的实现方式略有不同。有序消息需要借助MessageQueue的分区机制和消费者端的顺序消息消费来实现。而无序消息则是通过消息的发送和接收的并发处理来实现的。
总的来说,RocketMQ既支持有序消息也支持无序消息,根据业务需求选择合适的消息类型来满足业务的要求。
- 选择消费方式,目前仅提供pull方式。值得注意的是,RocketMQ还提供了推送(push)方式的消费模式,其中消息队列服务器会主动将消息推送给消费者。但在当前仅限于pull方式的消费模式。
- 填写消费数量。
- 下拉选择选择已建的消息主题和订阅组,若无选项,请新增主题和订阅组,详见上文创建主题和订阅组。
- 点击【测试】按钮,按照已填写规格及数量产生消费数据,展示消息数据的信息,包括消息ID(messageID)、主题名称(topicName)、生成时间、存储时间、队列ID、消费状态。
拨测功能涉及消息消费状态码,RocketMQ消费状态码是指在消息消费过程中,对消费结果进行标识的状态码。以下是常见的RocketMQ消费状态码:
✧ CONSUME_SUCCESS(消费成功):表示消息成功被消费。
✧ RECONSUME_LATER(稍后重试):表示消费失败,需要稍后再次进行消费。
✧ CONSUME_FAILURE(消费失败):表示消息消费出现异常或失败。
✧ SLAVE_NOT_AVAILABLE(从节点不可用):表示消费者无法访问从节点来消费消息。
✧ NO_MATCHED_MESSAGE(无匹配的消息):表示当前没有匹配的消息需要消费。
✧ OFFSET_ILLEGAL(偏移量非法):表示消费的偏移量参数不合法。
✧ BROKER_TIMEOUT(Broker超时):表示由于Broker超时导致消费失败。
5、 用户应用按照规范接入RocketMQ,发送、消费消息。
1)生产者示例API
以下适用于南京3、上海7、重庆2、乌鲁木齐27、保定、石家庄20、内蒙6、晋中、北京5节点。
--ctgmq引擎版本,SDK下载方式详见环境准备-其他工具章节。
package com.ctg.guide;
import com.ctg.mq.api.CTGMQFactory;
import com.ctg.mq.api.IMQProducer;
import com.ctg.mq.api.PropertyKeyConst;
import com.ctg.mq.api.bean.MQMessage;
import com.ctg.mq.api.bean.MQSendResult;
import com.ctg.mq.api.exception.MQException;
import com.ctg.mq.api.exception.MQProducerException;
import java.util.Properties;
/**
* Producer,发送消息
*/
public class Producer {
public static void main(String[] args) throws InterruptedException, MQException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ProducerGroupName, "producer_group");
properties.setProperty(PropertyKeyConst.NamesrvAddr, "10.50.208.1:9876;10.50.208.2:9876;10.50.208.3:9876");
properties.setProperty(PropertyKeyConst.NamesrvAuthID, "app4test");
properties.setProperty(PropertyKeyConst.NamesrvAuthPwd, "******"); properties.setProperty(PropertyKeyConst.ClusterName, "defaultMQBrokerCluster");
properties.setProperty(PropertyKeyConst.TenantID, "defaultMQTenantID");
IMQProducer producer = CTGMQFactory.createProducer(properties);//建议应用启动时创建
int connectResult = producer.connect();
if(connectResult != 0){
return;
}
for (int i = 0; i < 10; i++) {
try {
MQMessage message = new MQMessage(
"test_topic_1",// topic
"ORDER_KEY_"+i,// key
"ORDER_TAG",//tag
("HELLO ORDER BODY" + i).getBytes()// body
);
MQSendResult sendResult = producer.send(message);
//System.out.println(sendResult);
//TODO
} catch (MQProducerException e) {
//TODO
}
}
producer.close();//建议应用关闭时关闭
}
以下适用于华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2-贵州、太原4、郑州5、西安7、呼和浩特3节点。
--rocketmq引擎版本,SDK下载方式详见环境准备-其他工具章节。
importorg.apache.rocketmq.client.exception.MQClientException;
importorg.apache.rocketmq.client.producer.DefaultMQProducer;
importorg.apache.rocketmq.client.producer.SendResult;
importorg.apache.rocketmq.common.message.Message;
importorg.apache.rocketmq.remoting.common.RemotingHelper;
importorg.apache.rocketmq.acl.common.AclClientRPCHook;
publicclassProducer{
publicstaticvoidmain(String[] args)throwsMQClientException,InterruptedException{
AclClientRPCHook rpcHook =newAclClientRPCHook(
newSessionCredentials(ACCESS_KEY, SECRET_KEY));
DefaultMQProducer producer =newDefaultMQProducer("ProducerGroupName", rpcHook);
// 填入元数据地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); //如果需要开启SSL,请增加此行代码
producer.start();
for(int i =0; i <128; i++)
try{
{
Message msg =newMessage("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
}catch(Exception e){
e.printStackTrace();
}
producer.shutdown();
}
}
示例参数说明:
Namesrv地址
Namesrv地址可从控制台查看,多个地址按分号分隔:
应用用户和密码
这个应用用户和密码就是控制台创建的应用用户和密码。
租户id和集群名
集群名和租户id可以从应用用户管理查询:
生产组
生产组名不需要提前创建,只需创建生产者时候配置,服务端会自动创建。建议按业务规划好生产组名,严禁按随机方式生成生产组名。
6.消费者示例API
以下适用于南京3、上海7、重庆2、乌鲁木齐27、保定、石家庄20、内蒙6、晋中、北京5节点。
--ctgmq引擎版本,SDK下载方式详见环境准备-其他工具章节。
package com.ctg.guide;
import com.ctg.mq.api.enums.MQConsumeFromWhere;
import com.ctg.mq.api.CTGMQFactory;
import com.ctg.mq.api.IMQPushConsumer;
import com.ctg.mq.api.PropertyKeyConst;
import com.ctg.mq.api.bean.MQResult;
import com.ctg.mq.api.listener.ConsumerTopicListener;
import com.ctg.mq.api.listener.ConsumerTopicStatus;
import java.util.List;
import java.util.Properties;
public class PushConsumer {
public static void main(String[] args) throws Exception {
final Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ConsumerGroupName, "test_consumer_1");
properties.setProperty(PropertyKeyConst.NamesrvAddr, "10.50.208.1:9876;10.50.208.2:9876;10.50.208.3:9876");
properties.setProperty(PropertyKeyConst.NamesrvAuthID, "app4test");
properties.setProperty(PropertyKeyConst.NamesrvAuthPwd, "******"); properties.setProperty(PropertyKeyConst.ClusterName, "defaultMQBrokerCluster");
properties.setProperty(PropertyKeyConst.TenantID, "defaultMQTenantID");
IMQPushConsumer consumer = CTGMQFactory.createPushConsumer(properties);
int connectResult = consumer.connect();
if (connectResult != 0) {
return;
}
consumer.listenTopic("test_topic_1", null, new ConsumerTopicListener() {
@Override
public ConsumerTopicStatus onMessage(List<MQResult> mqResultList) {
//mqResultList 默认为1,可通过批量消费数量设置
for(MQResult result : mqResultList) {
//TODO
System.out.println(result);
}
return ConsumerTopicStatus.CONSUME_SUCCESS;//对消息批量确认(成功)
//return ConsumerTopicStatus.RECONSUME_LATER;//对消息批量确认(失败)
}
});
}
}
以下适用于华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2-贵州、太原4、郑州5、西安7、呼和浩特3节点。
--rocketmq引擎版本,SDK下载方式详见环境准备-其他工具章节。
importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
importorg.apache.rocketmq.client.exception.MQClientException;
publicclassPushConsumer{
publicstaticvoidmain(String[] args)throwsException{
AclClientRPCHook rpcHook =newAclClientRPCHook(
newSessionCredentials(ACCESS_KEY, SECRET_KEY));
DefaultMQPushConsumer consumer =newDefaultMQPushConsumer(rpcHook);
consumer.setConsumerGroup("ConsumerGroupName");
// 填入元数据地址
consumer.setNamesrvAddr("192.168.0.1:9876");
//consumer.setUseTLS(true); //如果需要开启SSL,请增加此行代码
consumer.subscribe("TopicTest","*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
示例参数说明:
Namesrv地址
Namesrv地址可从控制台查看,多个地址按分号分隔。
应用用户和密码
这个应用用户和密码就是控制台创建的应用用户和密码。
租户id和集群名
集群名和租户id可以从应用用户管理查询。
订阅组
订阅组名需要在控制台提前创建好。