searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

rocketmq消息轨迹实现原理

2022-12-22 02:36:11
91
0
版本:rocketmq 4.9.4,rocketmq-dashboard(开源控制台)
 

功能介绍

消息轨迹是指一条消息从生产者发送到RocketMQ服务端broker,再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整链路信息。该轨迹可作为生产环境中排查问题强有力的数据支持。

消息轨迹数据关键属性

Producer端 Consumer端 Broker端
生产实例信息 消费实例信息 消息的Topic
发送消息时间 投递时间,投递轮次 消息存储位置
消息是否发送成功 消息是否消费成功 消息的Key值
发送耗时 消费耗时 消息的Tag值

成本代价

增加消息存储的成本,消息轨迹数据本质上也是消息,因此都是存储broker上,属于业务的数据的附加数据,用于问题排查定位。
消息轨迹功能在实现了实现了配置化,异步化,插件化,对原有功能没有任何影响。
 

快速启动

Broker端配置文件

## 开启消息轨迹功能,默认为false
traceTopicEnable=true

 

PS:建议专门规划一个broker节点来存储这些轨迹消息数据,即rocketmq集群只有一个broker配置启动消息轨迹功能,且这个节点不承担正常业务数据。
 

订阅消息时开启消息轨迹

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.setNamesrvAddr("XX.XX.XX.XX1");
        producer.start();
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
ps:原生只支持Push模式的消费者开启消息轨迹,pull的不支持
 

发送消息时开启消息轨迹

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.setNamesrvAddr("XX.XX.XX.XX");
        producer.start();
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

消息轨迹数据存储

默认情况,消息轨迹数据存储在RMQ_SYS_TRACE_TOPIC这个topic,broker默认会创建,也可以支持在消息发送,消费的时候指定。
        ##其中Topic_test11111需要用户自己预先创建,来保存消息轨迹;
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
        ......

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
        ......

 

实现方案

实现时序图

 

源码跟踪

/**
 * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
 * name.
 *
 * @param namespace Namespace for this MQ Producer instance.
 * @param producerGroup Producer group, see the name-sake field.
 * @param rpcHook RPC hook to execute per each remoting command execution.
 * @param enableMsgTrace Switch flag instance for message trace.
 * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
 * trace topic name.
 */
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
    boolean enableMsgTrace, final String customizedTraceTopic) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.defaultMQProducerImpl);
            traceDispatcher = dispatcher;
            this.defaultMQProducerImpl.registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));
            this.defaultMQProducerImpl.registerEndTransactionHook(
                new EndTransactionTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

 

跟踪一下DefaultMQProducer的源码,我们可以发现当enableMsgTrace为true时,创建消费者的同时会创建AsyncTraceDispatcherSendMessageTraceHookImplEndTransactionTraceHookImpl3个对象。生产者端的消息轨迹功能就由这3个类搭配实现,其中,EndTransactionTraceHookImpl在发送事务消息时被使用到
 
SendMessageTraceHookImpl
package org.apache.rocketmq.client.trace.hook;

import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;

public class SendMessageTraceHookImpl implements SendMessageHook {

    private TraceDispatcher localDispatcher;

    public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
        this.localDispatcher = localDispatcher;
    }

    @Override
    public String hookName() {
        return "SendMessageTraceHook";
    }

    /**
     * 这里并没有发送消息轨道数据,只是先准备好一部分数据,然后存储在发送上下文环境中
     */
    @Override
    public void sendMessageBefore(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
            return;
        }
        //build the context content of TuxeTraceContext
        TraceContext tuxeContext = new TraceContext();
        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
        context.setMqTraceContext(tuxeContext);
        tuxeContext.setTraceType(TraceType.Pub);
        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
        //build the data bean object of message trace
        TraceBean traceBean = new TraceBean();
        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
        traceBean.setTags(context.getMessage().getTags());
        traceBean.setKeys(context.getMessage().getKeys());
        traceBean.setStoreHost(context.getBrokerAddr());
        traceBean.setBodyLength(context.getMessage().getBody().length);
        traceBean.setMsgType(context.getMsgType());
        tuxeContext.getTraceBeans().add(traceBean);
    }

    @Override
    public void sendMessageAfter(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
            || context.getMqTraceContext() == null) {
            return;
        }
        if (context.getSendResult() == null) {
            return;
        }

        if (context.getSendResult().getRegionId() == null
            || !context.getSendResult().isTraceOn()) {
            // if switch is false,skip it
            return;
        }

        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
        tuxeContext.setCostTime(costTime);
        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
            tuxeContext.setSuccess(true);
        } else {
            tuxeContext.setSuccess(false);
        }
        tuxeContext.setRegionId(context.getSendResult().getRegionId());
        traceBean.setMsgId(context.getSendResult().getMsgId());
        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
        // 这一步才真正去发送轨迹数据消息   这一步存在数据丢失的可能
        localDispatcher.append(tuxeContext);
    }
}

 

从方法名就可以得知,sendMessageBefore在消息发送前执行,sendMessageAfter在broker返回消息投递结果后执行。从代码中我们清楚知道,封装好消息轨迹数据投递到broker是在 localDispatcher.append(tuxeContext) 这一步。我们再看看localDispatcher对象的源码。
 
TraceDispatcher
package org.apache.rocketmq.client.trace;

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;

/**
 * Interface of asynchronous transfer data
 */
public interface TraceDispatcher {
    enum Type {
        PRODUCE,
        CONSUME
    }
    /**
     * Initialize asynchronous transfer data module
     */
    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

    /**
     * Append the transfering data
     * @param ctx data information
     * @return
     */
    boolean append(Object ctx);

    /**
     * Write flush action
     *
     * @throws IOException
     */
    void flush() throws IOException;

    /**
     * Close the trace Hook
     */
    void shutdown();
}

 

AsyncTraceDispatcher(这个类的源代码比较多,所以只列出关键方式与属性,要看完整代码,请读者自行查看rocketmq源码)
public class AsyncTraceDispatcher implements TraceDispatcher {

private final ArrayBlockingQueue<TraceContext> traceContextQueue;

public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
    // queueSize is greater than or equal to the n power of 2 of value
    this.queueSize = 2048;
    this.batchSize = 100;
    this.maxMsgSize = 128000;
    this.pollingTimeMil = 100;
    this.waitTimeThresholdMil = 500;
    this.discardCount = new AtomicLong(0L);
    this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
    this.taskQueueByTopic = new HashMap();
    this.group = group;
    this.type = type;

    this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
    if (!UtilAll.isBlank(traceTopicName)) {
        this.traceTopicName = traceTopicName;
    } else {
        this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
    }
    this.traceExecutor = new ThreadPoolExecutor(//
            10, //
            20, //
            1000 * 60, //
            TimeUnit.MILLISECONDS, //
            this.appenderQueue, //
            new ThreadFactoryImpl("MQTraceSendThread_"));
    traceProducer = getAndCreateTraceProducer(rpcHook);
}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

@Override
public boolean append(final Object ctx) {
    boolean result = traceContextQueue.offer((TraceContext) ctx);
    if (!result) {
        log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
    }
    return result;
}

}

class AsyncRunnable implements Runnable {
    private boolean stopped;

    @Override
    public void run() {
        while (!stopped) {
            synchronized (traceContextQueue) {
                long endTime = System.currentTimeMillis() + pollingTimeMil;
                while (System.currentTimeMillis() < endTime) {
                    try {
                        TraceContext traceContext = traceContextQueue.poll(
                                endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
                        );

                        if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
                            // get the topic which the trace message will send to
                            String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());

                            // get the traceDataSegment which will save this trace message, create if null
                            TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
                            if (traceDataSegment == null) {
                                traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId());
                                taskQueueByTopic.put(traceTopicName, traceDataSegment);
                            }

                            // encode traceContext and save it into traceDataSegment
                            // NOTE if data size in traceDataSegment more than maxMsgSize,
                            //  a AsyncDataSendTask will be created and submitted
                            TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
                            traceDataSegment.addTraceTransferBean(traceTransferBean);
                        }
                    } catch (InterruptedException ignore) {
                        log.debug("traceContextQueue#poll exception");
                    }
                }

                // NOTE send the data in traceDataSegment which the first TraceTransferBean
                //  is longer than waitTimeThreshold
                sendDataByTimeThreshold();

                if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }
        }

    }

 

从代码中我们可以看到,消息轨迹数据的投递是异步的,先提交到阻塞队列traceContextQueue中(默认容量1024),然后再由worker线程执行
同时我们可以看到,如果traceContextQueue队列满了,提交不成功这种情况是只记录丢弃日志,没有额外处理的,所以出现这种情况,会丢失消息轨迹数据。

数据格式

TraceContext >>> TraceTransferBean >>> TraceDataSegment
traceBeans
public class TraceContext implements Comparable<TraceContext> {
    private TraceType traceType;
    private long timeStamp = System.currentTimeMillis();
    private String regionId = "";
    private String regionName = "";
    private String groupName = "";
    private int costTime = 0;
    private boolean isSuccess = true;
    private String requestId = MessageClientIDSetter.createUniqID();
    private int contextCode = 0;
    private List<TraceBean> traceBeans;
}

 

public class TraceBean {
    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
    private String topic = "";
    private String msgId = "";
    private String offsetMsgId = "";
    private String tags = "";
    private String keys = "";
    private String storeHost = LOCAL_ADDRESS;
    private String clientHost = LOCAL_ADDRESS;
    private long storeTime;
    private int retryTimes;
    private int bodyLength;
    private MessageType msgType;
    private LocalTransactionState transactionState;
    private String transactionId;
    private boolean fromTransactionCheck;
}

 

public class TraceTransferBean {
    private String transData;
    private Set<String> transKey = new HashSet<String>();
}
class TraceDataSegment {
    private long firstBeanAddTime;
    private int currentMsgSize;
    private final String traceTopicName;
    private final String regionId;
    private final List<TraceTransferBean> traceTransferBeanList = new ArrayList();

    TraceDataSegment(String traceTopicName, String regionId) {
        this.traceTopicName = traceTopicName;
        this.regionId = regionId;
    }


}
    /**
     * 计算第一次添加的时间(用于超时500ms发送)
     * 如果累计消息大小超过消息最大大小,4m
     * 封装成AsyncDataSendTask提交到线程池执行
     * 清空firstBeanAddTime,currentMsgSize,traceTransferBeanList,方便重复利用
     */
    public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
        initFirstBeanAddTime();
        this.traceTransferBeanList.add(traceTransferBean);
        this.currentMsgSize += traceTransferBean.getTransData().length();
        if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
            List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
            AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
            traceExecutor.submit(asyncDataSendTask);

            this.clear();

        }
    }

 

AsyncDataSendTask
核心方法在sendTraceDataByMQ
class AsyncDataSendTask implements Runnable {
    private final String traceTopicName;
    private final String regionId;
    private final List<TraceTransferBean> traceTransferBeanList;

    public AsyncDataSendTask(String traceTopicName, String regionId, List<TraceTransferBean> traceTransferBeanList) {
        this.traceTopicName = traceTopicName;
        this.regionId = regionId;
        this.traceTransferBeanList = traceTransferBeanList;
    }

    @Override
    public void run() {
        StringBuilder buffer = new StringBuilder(1024);
        Set<String> keySet = new HashSet<String>();
        for (TraceTransferBean bean : traceTransferBeanList) {
            keySet.addAll(bean.getTransKey());
            buffer.append(bean.getTransData());
        }
        sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
    }

    /**
     * Send message trace data
     *
     * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
     * @param data   the message trace data in this batch
     * @param traceTopic the topic which message trace data will send to
     */
    private void sendTraceDataByMQ(Set<String> keySet, final String data, String traceTopic) {
        final Message message = new Message(traceTopic, data.getBytes());
        // Keyset of message trace includes msgId of or original message
        message.setKeys(keySet);
        try {
            Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
            SendCallback callback = new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {

                }

                @Override
                public void onException(Throwable e) {
                    log.error("send trace data failed, the traceData is {}", data, e);
                }
            };
            if (traceBrokerSet.isEmpty()) {
                // No cross set
                traceProducer.send(message, callback, 5000);
            } else {
                traceProducer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Set<String> brokerSet = (Set<String>) arg;
                        List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
                        for (MessageQueue queue : mqs) {
                            if (brokerSet.contains(queue.getBrokerName())) {
                                filterMqs.add(queue);
                            }
                        }
                        int index = sendWhichQueue.incrementAndGet();
                        int pos = Math.abs(index) % filterMqs.size();
                        if (pos < 0) {
                            pos = 0;
                        }
                        return filterMqs.get(pos);
                    }
                }, traceBrokerSet, callback);
            }

        } catch (Exception e) {
            log.error("send trace data failed, the traceData is {}", data, e);
        }
    }

    private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
        Set<String> brokerSet = new HashSet<String>();
        TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
            producer.getMqClientFactory().updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
                brokerSet.add(queue.getBrokerName());
            }
        }
        return brokerSet;
    }
}
ps:可见,消息轨迹数据也是封装成msg通过producer发送到broker的,本质跟业务数据无异。
 
问题,从代码中看到是有批量处理的,多个消息轨迹合成一条消息去发送,broker对这种是否有特殊处理呢?最终存储还是只有一条?
broker是没有特殊处理,这些消息就是一条的消息的形式存入的。在控制读取的时候,做了解析过滤处理。
 
 

控制台操作

通过message key查询

通过message id查询

消息轨迹详情

图表数据

ps:可以清楚看到生产耗时,生产到消费的时间间隔,消费耗时
 

发送消息消息数据

消费消息详细数据

 

 

 

 

0条评论
0 / 1000
c****n
4文章数
2粉丝数
c****n
4 文章 | 2 粉丝
c****n
4文章数
2粉丝数
c****n
4 文章 | 2 粉丝
原创

rocketmq消息轨迹实现原理

2022-12-22 02:36:11
91
0
版本:rocketmq 4.9.4,rocketmq-dashboard(开源控制台)
 

功能介绍

消息轨迹是指一条消息从生产者发送到RocketMQ服务端broker,再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整链路信息。该轨迹可作为生产环境中排查问题强有力的数据支持。

消息轨迹数据关键属性

Producer端 Consumer端 Broker端
生产实例信息 消费实例信息 消息的Topic
发送消息时间 投递时间,投递轮次 消息存储位置
消息是否发送成功 消息是否消费成功 消息的Key值
发送耗时 消费耗时 消息的Tag值

成本代价

增加消息存储的成本,消息轨迹数据本质上也是消息,因此都是存储broker上,属于业务的数据的附加数据,用于问题排查定位。
消息轨迹功能在实现了实现了配置化,异步化,插件化,对原有功能没有任何影响。
 

快速启动

Broker端配置文件

## 开启消息轨迹功能,默认为false
traceTopicEnable=true

 

PS:建议专门规划一个broker节点来存储这些轨迹消息数据,即rocketmq集群只有一个broker配置启动消息轨迹功能,且这个节点不承担正常业务数据。
 

订阅消息时开启消息轨迹

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.setNamesrvAddr("XX.XX.XX.XX1");
        producer.start();
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
ps:原生只支持Push模式的消费者开启消息轨迹,pull的不支持
 

发送消息时开启消息轨迹

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.setNamesrvAddr("XX.XX.XX.XX");
        producer.start();
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

消息轨迹数据存储

默认情况,消息轨迹数据存储在RMQ_SYS_TRACE_TOPIC这个topic,broker默认会创建,也可以支持在消息发送,消费的时候指定。
        ##其中Topic_test11111需要用户自己预先创建,来保存消息轨迹;
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
        ......

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
        ......

 

实现方案

实现时序图

 

源码跟踪

/**
 * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
 * name.
 *
 * @param namespace Namespace for this MQ Producer instance.
 * @param producerGroup Producer group, see the name-sake field.
 * @param rpcHook RPC hook to execute per each remoting command execution.
 * @param enableMsgTrace Switch flag instance for message trace.
 * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
 * trace topic name.
 */
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
    boolean enableMsgTrace, final String customizedTraceTopic) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.defaultMQProducerImpl);
            traceDispatcher = dispatcher;
            this.defaultMQProducerImpl.registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));
            this.defaultMQProducerImpl.registerEndTransactionHook(
                new EndTransactionTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

 

跟踪一下DefaultMQProducer的源码,我们可以发现当enableMsgTrace为true时,创建消费者的同时会创建AsyncTraceDispatcherSendMessageTraceHookImplEndTransactionTraceHookImpl3个对象。生产者端的消息轨迹功能就由这3个类搭配实现,其中,EndTransactionTraceHookImpl在发送事务消息时被使用到
 
SendMessageTraceHookImpl
package org.apache.rocketmq.client.trace.hook;

import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;

public class SendMessageTraceHookImpl implements SendMessageHook {

    private TraceDispatcher localDispatcher;

    public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
        this.localDispatcher = localDispatcher;
    }

    @Override
    public String hookName() {
        return "SendMessageTraceHook";
    }

    /**
     * 这里并没有发送消息轨道数据,只是先准备好一部分数据,然后存储在发送上下文环境中
     */
    @Override
    public void sendMessageBefore(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
            return;
        }
        //build the context content of TuxeTraceContext
        TraceContext tuxeContext = new TraceContext();
        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
        context.setMqTraceContext(tuxeContext);
        tuxeContext.setTraceType(TraceType.Pub);
        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
        //build the data bean object of message trace
        TraceBean traceBean = new TraceBean();
        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
        traceBean.setTags(context.getMessage().getTags());
        traceBean.setKeys(context.getMessage().getKeys());
        traceBean.setStoreHost(context.getBrokerAddr());
        traceBean.setBodyLength(context.getMessage().getBody().length);
        traceBean.setMsgType(context.getMsgType());
        tuxeContext.getTraceBeans().add(traceBean);
    }

    @Override
    public void sendMessageAfter(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
            || context.getMqTraceContext() == null) {
            return;
        }
        if (context.getSendResult() == null) {
            return;
        }

        if (context.getSendResult().getRegionId() == null
            || !context.getSendResult().isTraceOn()) {
            // if switch is false,skip it
            return;
        }

        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
        tuxeContext.setCostTime(costTime);
        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
            tuxeContext.setSuccess(true);
        } else {
            tuxeContext.setSuccess(false);
        }
        tuxeContext.setRegionId(context.getSendResult().getRegionId());
        traceBean.setMsgId(context.getSendResult().getMsgId());
        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
        // 这一步才真正去发送轨迹数据消息   这一步存在数据丢失的可能
        localDispatcher.append(tuxeContext);
    }
}

 

从方法名就可以得知,sendMessageBefore在消息发送前执行,sendMessageAfter在broker返回消息投递结果后执行。从代码中我们清楚知道,封装好消息轨迹数据投递到broker是在 localDispatcher.append(tuxeContext) 这一步。我们再看看localDispatcher对象的源码。
 
TraceDispatcher
package org.apache.rocketmq.client.trace;

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;

/**
 * Interface of asynchronous transfer data
 */
public interface TraceDispatcher {
    enum Type {
        PRODUCE,
        CONSUME
    }
    /**
     * Initialize asynchronous transfer data module
     */
    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

    /**
     * Append the transfering data
     * @param ctx data information
     * @return
     */
    boolean append(Object ctx);

    /**
     * Write flush action
     *
     * @throws IOException
     */
    void flush() throws IOException;

    /**
     * Close the trace Hook
     */
    void shutdown();
}

 

AsyncTraceDispatcher(这个类的源代码比较多,所以只列出关键方式与属性,要看完整代码,请读者自行查看rocketmq源码)
public class AsyncTraceDispatcher implements TraceDispatcher {

private final ArrayBlockingQueue<TraceContext> traceContextQueue;

public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
    // queueSize is greater than or equal to the n power of 2 of value
    this.queueSize = 2048;
    this.batchSize = 100;
    this.maxMsgSize = 128000;
    this.pollingTimeMil = 100;
    this.waitTimeThresholdMil = 500;
    this.discardCount = new AtomicLong(0L);
    this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
    this.taskQueueByTopic = new HashMap();
    this.group = group;
    this.type = type;

    this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
    if (!UtilAll.isBlank(traceTopicName)) {
        this.traceTopicName = traceTopicName;
    } else {
        this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
    }
    this.traceExecutor = new ThreadPoolExecutor(//
            10, //
            20, //
            1000 * 60, //
            TimeUnit.MILLISECONDS, //
            this.appenderQueue, //
            new ThreadFactoryImpl("MQTraceSendThread_"));
    traceProducer = getAndCreateTraceProducer(rpcHook);
}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

@Override
public boolean append(final Object ctx) {
    boolean result = traceContextQueue.offer((TraceContext) ctx);
    if (!result) {
        log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
    }
    return result;
}

}

class AsyncRunnable implements Runnable {
    private boolean stopped;

    @Override
    public void run() {
        while (!stopped) {
            synchronized (traceContextQueue) {
                long endTime = System.currentTimeMillis() + pollingTimeMil;
                while (System.currentTimeMillis() < endTime) {
                    try {
                        TraceContext traceContext = traceContextQueue.poll(
                                endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
                        );

                        if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
                            // get the topic which the trace message will send to
                            String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());

                            // get the traceDataSegment which will save this trace message, create if null
                            TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
                            if (traceDataSegment == null) {
                                traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId());
                                taskQueueByTopic.put(traceTopicName, traceDataSegment);
                            }

                            // encode traceContext and save it into traceDataSegment
                            // NOTE if data size in traceDataSegment more than maxMsgSize,
                            //  a AsyncDataSendTask will be created and submitted
                            TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
                            traceDataSegment.addTraceTransferBean(traceTransferBean);
                        }
                    } catch (InterruptedException ignore) {
                        log.debug("traceContextQueue#poll exception");
                    }
                }

                // NOTE send the data in traceDataSegment which the first TraceTransferBean
                //  is longer than waitTimeThreshold
                sendDataByTimeThreshold();

                if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }
        }

    }

 

从代码中我们可以看到,消息轨迹数据的投递是异步的,先提交到阻塞队列traceContextQueue中(默认容量1024),然后再由worker线程执行
同时我们可以看到,如果traceContextQueue队列满了,提交不成功这种情况是只记录丢弃日志,没有额外处理的,所以出现这种情况,会丢失消息轨迹数据。

数据格式

TraceContext >>> TraceTransferBean >>> TraceDataSegment
traceBeans
public class TraceContext implements Comparable<TraceContext> {
    private TraceType traceType;
    private long timeStamp = System.currentTimeMillis();
    private String regionId = "";
    private String regionName = "";
    private String groupName = "";
    private int costTime = 0;
    private boolean isSuccess = true;
    private String requestId = MessageClientIDSetter.createUniqID();
    private int contextCode = 0;
    private List<TraceBean> traceBeans;
}

 

public class TraceBean {
    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
    private String topic = "";
    private String msgId = "";
    private String offsetMsgId = "";
    private String tags = "";
    private String keys = "";
    private String storeHost = LOCAL_ADDRESS;
    private String clientHost = LOCAL_ADDRESS;
    private long storeTime;
    private int retryTimes;
    private int bodyLength;
    private MessageType msgType;
    private LocalTransactionState transactionState;
    private String transactionId;
    private boolean fromTransactionCheck;
}

 

public class TraceTransferBean {
    private String transData;
    private Set<String> transKey = new HashSet<String>();
}
class TraceDataSegment {
    private long firstBeanAddTime;
    private int currentMsgSize;
    private final String traceTopicName;
    private final String regionId;
    private final List<TraceTransferBean> traceTransferBeanList = new ArrayList();

    TraceDataSegment(String traceTopicName, String regionId) {
        this.traceTopicName = traceTopicName;
        this.regionId = regionId;
    }


}
    /**
     * 计算第一次添加的时间(用于超时500ms发送)
     * 如果累计消息大小超过消息最大大小,4m
     * 封装成AsyncDataSendTask提交到线程池执行
     * 清空firstBeanAddTime,currentMsgSize,traceTransferBeanList,方便重复利用
     */
    public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
        initFirstBeanAddTime();
        this.traceTransferBeanList.add(traceTransferBean);
        this.currentMsgSize += traceTransferBean.getTransData().length();
        if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
            List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
            AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
            traceExecutor.submit(asyncDataSendTask);

            this.clear();

        }
    }

 

AsyncDataSendTask
核心方法在sendTraceDataByMQ
class AsyncDataSendTask implements Runnable {
    private final String traceTopicName;
    private final String regionId;
    private final List<TraceTransferBean> traceTransferBeanList;

    public AsyncDataSendTask(String traceTopicName, String regionId, List<TraceTransferBean> traceTransferBeanList) {
        this.traceTopicName = traceTopicName;
        this.regionId = regionId;
        this.traceTransferBeanList = traceTransferBeanList;
    }

    @Override
    public void run() {
        StringBuilder buffer = new StringBuilder(1024);
        Set<String> keySet = new HashSet<String>();
        for (TraceTransferBean bean : traceTransferBeanList) {
            keySet.addAll(bean.getTransKey());
            buffer.append(bean.getTransData());
        }
        sendTraceDataByMQ(keySet, buffer.toString(), traceTopicName);
    }

    /**
     * Send message trace data
     *
     * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
     * @param data   the message trace data in this batch
     * @param traceTopic the topic which message trace data will send to
     */
    private void sendTraceDataByMQ(Set<String> keySet, final String data, String traceTopic) {
        final Message message = new Message(traceTopic, data.getBytes());
        // Keyset of message trace includes msgId of or original message
        message.setKeys(keySet);
        try {
            Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
            SendCallback callback = new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {

                }

                @Override
                public void onException(Throwable e) {
                    log.error("send trace data failed, the traceData is {}", data, e);
                }
            };
            if (traceBrokerSet.isEmpty()) {
                // No cross set
                traceProducer.send(message, callback, 5000);
            } else {
                traceProducer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Set<String> brokerSet = (Set<String>) arg;
                        List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
                        for (MessageQueue queue : mqs) {
                            if (brokerSet.contains(queue.getBrokerName())) {
                                filterMqs.add(queue);
                            }
                        }
                        int index = sendWhichQueue.incrementAndGet();
                        int pos = Math.abs(index) % filterMqs.size();
                        if (pos < 0) {
                            pos = 0;
                        }
                        return filterMqs.get(pos);
                    }
                }, traceBrokerSet, callback);
            }

        } catch (Exception e) {
            log.error("send trace data failed, the traceData is {}", data, e);
        }
    }

    private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
        Set<String> brokerSet = new HashSet<String>();
        TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
            producer.getMqClientFactory().updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
                brokerSet.add(queue.getBrokerName());
            }
        }
        return brokerSet;
    }
}
ps:可见,消息轨迹数据也是封装成msg通过producer发送到broker的,本质跟业务数据无异。
 
问题,从代码中看到是有批量处理的,多个消息轨迹合成一条消息去发送,broker对这种是否有特殊处理呢?最终存储还是只有一条?
broker是没有特殊处理,这些消息就是一条的消息的形式存入的。在控制读取的时候,做了解析过滤处理。
 
 

控制台操作

通过message key查询

通过message id查询

消息轨迹详情

图表数据

ps:可以清楚看到生产耗时,生产到消费的时间间隔,消费耗时
 

发送消息消息数据

消费消息详细数据

 

 

 

 

文章来自个人专栏
rocketmq专栏
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0