版本:rocketmq 4.9.4,rocketmq-dashboard(开源控制台)
功能介绍
消息轨迹是指一条消息从生产者发送到RocketMQ服务端broker,再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整链路信息。该轨迹可作为生产环境中排查问题强有力的数据支持。
消息轨迹数据关键属性
Producer端 | Consumer端 | Broker端 |
生产实例信息 | 消费实例信息 | 消息的Topic |
发送消息时间 | 投递时间,投递轮次 | 消息存储位置 |
消息是否发送成功 | 消息是否消费成功 | 消息的Key值 |
发送耗时 | 消费耗时 | 消息的Tag值 |
成本代价
增加消息存储的成本,消息轨迹数据本质上也是消息,因此都是存储broker上,属于业务的数据的附加数据,用于问题排查定位。
消息轨迹功能在实现了实现了配置化,异步化,插件化,对原有功能没有任何影响。
快速启动
Broker端配置文件
## 开启消息轨迹功能,默认为false
traceTopicEnable=true
PS:建议专门规划一个broker节点来存储这些轨迹消息数据,即rocketmq集群只有一个broker配置启动消息轨迹功能,且这个节点不承担正常业务数据。
订阅消息时开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.setNamesrvAddr("XX.XX.XX.XX1");
producer.start();
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
ps:原生只支持Push模式的消费者开启消息轨迹,pull的不支持
发送消息时开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.setNamesrvAddr("XX.XX.XX.XX");
producer.start();
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
消息轨迹数据存储
默认情况,消息轨迹数据存储在RMQ_SYS_TRACE_TOPIC这个topic,broker默认会创建,也可以支持在消息发送,消费的时候指定。
##其中Topic_test11111需要用户自己预先创建,来保存消息轨迹;
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
......
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
......
实现方案
实现时序图
源码跟踪
/**
* Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
* name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
跟踪一下DefaultMQProducer的源码,我们可以发现当enableMsgTrace为true时,创建消费者的同时会创建AsyncTraceDispatcher,SendMessageTraceHookImpl,EndTransactionTraceHookImpl3个对象。生产者端的消息轨迹功能就由这3个类搭配实现,其中,EndTransactionTraceHookImpl在发送事务消息时被使用到。
SendMessageTraceHookImpl
package org.apache.rocketmq.client.trace.hook;
import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class SendMessageTraceHookImpl implements SendMessageHook {
private TraceDispatcher localDispatcher;
public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "SendMessageTraceHook";
}
/**
* 这里并没有发送消息轨道数据,只是先准备好一部分数据,然后存储在发送上下文环境中
*/
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}
TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
// 这一步才真正去发送轨迹数据消息 这一步存在数据丢失的可能
localDispatcher.append(tuxeContext);
}
}
从方法名就可以得知,sendMessageBefore在消息发送前执行,sendMessageAfter在broker返回消息投递结果后执行。从代码中我们清楚知道,封装好消息轨迹数据投递到broker是在 localDispatcher.append(tuxeContext) 这一步。我们再看看localDispatcher对象的源码。
TraceDispatcher
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;
/**
* Interface of asynchronous transfer data
*/
public interface TraceDispatcher {
enum Type {
PRODUCE,
CONSUME
}
/**
* Initialize asynchronous transfer data module
*/
void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
/**
* Append the transfering data
* @param ctx data information
* @return
*/
boolean append(Object ctx);
/**
* Write flush action
*
* @throws IOException
*/
void flush() throws IOException;
/**
* Close the trace Hook
*/
void shutdown();
}
AsyncTraceDispatcher(这个类的源代码比较多,所以只列出关键方式与属性,要看完整代码,请读者自行查看rocketmq源码)
public class AsyncTraceDispatcher implements TraceDispatcher {
private final ArrayBlockingQueue<TraceContext> traceContextQueue;
public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.pollingTimeMil = 100;
this.waitTimeThresholdMil = 500;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.taskQueueByTopic = new HashMap();
this.group = group;
this.type = type;
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}
}
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
synchronized (traceContextQueue) {
long endTime = System.currentTimeMillis() + pollingTimeMil;
while (System.currentTimeMillis() < endTime) {
try {
TraceContext traceContext = traceContextQueue.poll(
endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
);
if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
// get the topic which the trace message will send to
String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());
// get the traceDataSegment which will save this trace message, create if null
TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
if (traceDataSegment == null) {
traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId());
taskQueueByTopic.put(traceTopicName, traceDataSegment);
}
// encode traceContext and save it into traceDataSegment
// NOTE if data size in traceDataSegment more than maxMsgSize,
// a AsyncDataSendTask will be created and submitted
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
traceDataSegment.addTraceTransferBean(traceTransferBean);
}
} catch (InterruptedException ignore) {
log.debug("traceContextQueue#poll exception");
}
}
// NOTE send the data in traceDataSegment which the first TraceTransferBean
// is longer than waitTimeThreshold
sendDataByTimeThreshold();
if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
从代码中我们可以看到,消息轨迹数据的投递是异步的,先提交到阻塞队列traceContextQueue中(默认容量1024),然后再由worker线程执行
同时我们可以看到,如果traceContextQueue队列满了,提交不成功这种情况是只记录丢弃日志,没有额外处理的,所以出现这种情况,会丢失消息轨迹数据。
数据格式
TraceContext >>> TraceTransferBean >>> TraceDataSegment
traceBeans
public class TraceContext implements Comparable<TraceContext> {
private TraceType traceType;
private long timeStamp = System.currentTimeMillis();
private String regionId = "";
private String regionName = "";
private String groupName = "";
private int costTime = 0;
private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0;
private List<TraceBean> traceBeans;
}
public class TraceBean {
private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
private String topic = "";
private String msgId = "";
private String offsetMsgId = "";
private String tags = "";
private String keys = "";
private String storeHost = LOCAL_ADDRESS;
private String clientHost = LOCAL_ADDRESS;
private long storeTime;
private int retryTimes;
private int bodyLength;
private MessageType msgType;
private LocalTransactionState transactionState;
private String transactionId;
private boolean fromTransactionCheck;
}
public class TraceTransferBean {
private String transData;
private Set<String> transKey = new HashSet<String>();
}
class TraceDataSegment {
private long firstBeanAddTime;
private int currentMsgSize;
private final String traceTopicName;
private final String regionId;
private final List<TraceTransferBean> traceTransferBeanList = new ArrayList();
TraceDataSegment(String traceTopicName, String regionId) {
this.traceTopicName = traceTopicName;
this.regionId = regionId;
}
}
/**
* 计算第一次添加的时间(用于超时500ms发送)
* 如果累计消息大小超过消息最大大小,4m
* 封装成AsyncDataSendTask提交到线程池执行
* 清空firstBeanAddTime,currentMsgSize,traceTransferBeanList,方便重复利用
*/
public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
initFirstBeanAddTime();
this.traceTransferBeanList.add(traceTransferBean);
this.currentMsgSize += traceTransferBean.getTransData().length();
if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);
this.clear();
}
}
AsyncDataSendTask
核心方法在sendTraceDataByMQ
class AsyncDataSendTask implements Runnable {
private final String traceTopicName;
private final String regionId;
private final List<TraceTransferBean> traceTransferBeanList;
public AsyncDataSendTask(String traceTopicName, String regionId, List<TraceTransferBean> traceTransferBeanList) {
this.traceTopicName = traceTopicName;
this.regionId = regionId;
this.traceTransferBeanList = traceTransferBeanList;
}
@Override
public void run() {
StringBuilder buffer = new StringBuilder(1024);
Set<String> keySet = new HashSet<String>();
for (TraceTransferBean bean : traceTransferBeanList) {
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
}
sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
}
/**
* Send message trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
* @param traceTopic the topic which message trace data will send to
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data, String traceTopic) {
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
log.error("send trace data failed, the traceData is {}", data, e);
}
};
if (traceBrokerSet.isEmpty()) {
// No cross set
traceProducer.send(message, callback, 5000);
} else {
traceProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Set<String> brokerSet = (Set<String>) arg;
List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
for (MessageQueue queue : mqs) {
if (brokerSet.contains(queue.getBrokerName())) {
filterMqs.add(queue);
}
}
int index = sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % filterMqs.size();
if (pos < 0) {
pos = 0;
}
return filterMqs.get(pos);
}
}, traceBrokerSet, callback);
}
} catch (Exception e) {
log.error("send trace data failed, the traceData is {}", data, e);
}
}
private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
Set<String> brokerSet = new HashSet<String>();
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
producer.getMqClientFactory().updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
brokerSet.add(queue.getBrokerName());
}
}
return brokerSet;
}
}
ps:可见,消息轨迹数据也是封装成msg通过producer发送到broker的,本质跟业务数据无异。
问题,从代码中看到是有批量处理的,多个消息轨迹合成一条消息去发送,broker对这种是否有特殊处理呢?最终存储还是只有一条?
broker是没有特殊处理,这些消息就是一条的消息的形式存入的。在控制读取的时候,做了解析过滤处理。
控制台操作
通过message key查询
通过message id查询
消息轨迹详情
图表数据
ps:可以清楚看到生产耗时,生产到消费的时间间隔,消费耗时
发送消息消息数据
消费消息详细数据