searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

rocketmq任意延迟消息实现原理

2023-03-28 03:09:37
383
0

前言

延迟消息就是生产者投递消息,希望其在指定的时间后才能被消费者所消费。
任意时间戳(精度到秒)延迟消息是rocketmq 5.0.0的功能,之前的版本都是只能指定固定延迟的级别(18个)
 

1.快速上手

5.0.0生产者

package org.apache.rocketmq.example.schedule;

import java.nio.charset.StandardCharsets;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class TimerMessageProducer {

    public static final String PRODUCER_GROUP = "TimerMessageProducerGroup";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TimerTopic";

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        // Launch producer
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));
            // This message will be delivered to consumer 10 seconds later.
            //message.setDelayTimeSec(10);
            // The effect is the same as the above
            // message.setDelayTimeMs(10_000L);
            // Set the specific delivery time, and the effect is the same as the above
            message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
            // Send the message
            SendResult result = producer.send(message);
            System.out.printf(result + "\n");
        }

        // Shutdown producer after use.
        producer.shutdown();
    }
}
从代码中我们可以看到,有3种方法指定延迟的时间,分别是setDelayTimeSec,setDelayTimeMs,setDeliverTimeMs,从方法名我们可以得知前两个方法是指延迟指定的时间后执行,只不过单位不同(一个是秒,一个毫秒),最后一个方法则是直接指定了可被消费的时间戳。
 

4.9.4生产者

package org.apache.rocketmq.example.schedule;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;

import java.nio.charset.StandardCharsets;

public class TimerMessageProducer {

    public static final String PRODUCER_GROUP = "TimerMessageProducerGroup";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TimerTopic";

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.setSendMsgTimeout(30000);

        // Launch producer
        long start = System.currentTimeMillis();
        producer.start();
        System.out.println("start the producer :" + (System.currentTimeMillis() - start));
        int totalMessagesToSend = 10000;
        for (int i = 0; i < totalMessagesToSend; i++) {
            try {
                Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));

                // 不升级客户端的前提下,直接设置属性  延迟10秒后
                // 1.延迟10s
                // message.putUserProperty("TIMER_DELAY_SEC", "10");
                // 2.延迟 10000ms
                // message.putUserProperty("TIMER_DELAY_MS", "10000");
                // 3.在当前时间 + 100000ms 的时刻执行
                message.putUserProperty("__STARTDELIVERTIME",String.valueOf(System.currentTimeMillis() + 10_000L));

                // Send the message
                SendResult result = producer.send(message);
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }

        }

        // Shutdown producer after use.
        producer.shutdown();
    }
}
 

消费者

package org.apache.rocketmq.example.schedule;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class TimerMessageConsumer {
    public static final String CONSUMER_GROUP = "TimerMessageConsumerGroup";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TimerTopic";

    public static void main(String[] args) throws Exception {
        // Instantiate message consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        // Subscribe topics
        consumer.subscribe(TOPIC, "*");
        // Register message listener
        consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
            for (MessageExt message : messages) {
                // Print approximate delay time period
                System.out.printf("Receive message[msgId=%s %d  ms later]\n", message.getMsgId(),
                    System.currentTimeMillis() - message.getBornTimestamp());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // Launch consumer
        consumer.start();
        //info:to see the time effect, run the consumer first , it will wait for the msg
        //then start the producer
    }
}
从代码中我们可以得知,对于延迟消息来说,消费者并不需要配置啥,代码只是验证了延迟消息的准确性而已。客户端没有版本限制,4.9.4与5.0.0保持一致.
 

阿里云SDK

生产者
package org.apache.rocketmq.example.schedule;


import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class AliYunTimerProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        //设置发送超时时间,单位毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "127.0.0.1:9876");

        Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();

        Message msg = new Message(
                // Message所属的Topic。
                "TopicTestMQ",
                // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                "TagA",
                // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());

        // 设置代表消息的业务关键属性,请尽可能全局唯一。以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发。
        msg.setKey("ORDERID_100");

        msg.setStartDeliverTime(System.currentTimeMillis() + 10_000L);
        // 异步发送消息,发送结果通过callback返回给客户端。
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                // 消息发送成功。
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });

        // 在callback返回之前即可取得msgId。
        System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

        // 在应用退出前,销毁Producer对象。 注意:如果不销毁也没有问题。
        producer.shutdown();
    }
}
 
 
消费者
package org.apache.rocketmq.example.schedule;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class AliyunConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制台创建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "127.0.0.1:9876");
        // 集群订阅方式(默认)。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 广播订阅方式。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                System.out.printf("Receive message[msgId=%s %d  ms later]\n", message.getMsgID(),
                        System.currentTimeMillis() - message.getBornTimestamp());
                return Action.CommitMessage;
            }
        });

        //订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

 

小结

可见,延迟消息使用上是简单的,旧版本的客户端也是手动配置相关属性实现兼容,可兼容阿里云rocketmq的java sdk。
 
 

2.实现原理

Rocketmq的架构中,producer和consumer都是客户端,nameserver提供服务发现和管理的功能,真正数据处理在broker,我们一起来看看broker是如何支持任意时间维度延迟消息的。
 

1.初始化

BrokerController
1.初始化方法中初始化相关对象
if (messageStoreConfig.isTimerWheelEnable()) {
    this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
    TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
    this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
    this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
    this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
从代码中,我们可以看到TimerCheckpoint,TimerMetrics,TimerMessageStore几个关键的类,任意时间延迟消息的实现主要就是由这几个类实现。
 
2.通过匿名内部类的方式注册到putMessageHookList中,最终是由HookUtils.handleScheduleMessage执行。
putMessageHookList.add(new PutMessageHook() {
    @Override
    public String hookName() {
        return "handleScheduleMessage";
    }

    @Override
    public PutMessageResult executeBeforePutMessage(MessageExt msg) {
        if (msg instanceof MessageExtBrokerInner) {
            return HookUtils.handleScheduleMessage(BrokerController.this, (MessageExtBrokerInner) msg);
        }
        return null;
    }
});
 

2.消息写入过程中的处理

 

3.broker内部处理

4.关键数据结构解析

TimerLog

UNIT_SIZE
prev pos
 
magic value
 
curr write time, for trace
delayed time, for check
 
offsetPy
sizePy
 
hash code of real topic
reserved value, just in case of
4(默认52)
 
同一个槽(同一秒)前一个timerlog记录的offset
3种情况
1.普通消息
2.回滚
3.删除
写入文件的时间
 
距离写入时间的延迟时间
原消息在commitlog中的物理offset
 
原消息在commitlog中的长度
 
真实topic的hashcode值
 
8(默认0)
4(默认52)
8
4
8
4
8
4
4
8(默认0)
可见,TimerLog 跟commitLog的设计都是一样的,消息时混合写入的,不按时间区分。
 

TimerWheel

是用mmap技术将文件映射成一个 7 * 24 * 3600 * 2 * 32 的数组,用于存放Slot的数据(并没有直接的关系,而且通过代码映射起来)
能支持最长的14天定时任务(实际上还有其他地方限制,所以默认最多设置的是3天内的任意延迟消息)
 

 

 

Slot

timeMs
firstPos
lastPos
num
magic
8
8
8
4
4
执行的时间戳
该槽第一个timerLog记录的offset
该槽最后一个timerLog记录的offset
该槽timerlog数量
目前没用
 

3.使用限制

1.定时消息的精度会有1s~2s的延迟误差。
2.定时和延时消息的TIMER_DELIVER_MS参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
3.定时和延时消息的TIMER_DELIVER_MS参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败(TIMER_DELAY_SEC < 40*24*3600,TIMER_DELAY_MS<40*24*3600*1000)。
4.StartDeliverTime是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
5.由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。
6.设置定时和延时消息的投递时间后,依然受7天的消息保存时长限制(与线上环境消息保存时长保持一致)。
例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第12天被删除。
7.任意延迟消息的broker处理进度会持久化,重启过后会把继续处理尚未处理但是超时的数据,保持消息可以被消费,就是精度会受影响。
 

使用建议

避免大量相同定时时刻的消息,一个broker节点不能超过10w。
定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
 
 
 
 
 

 

 

 

0条评论
0 / 1000
c****n
4文章数
2粉丝数
c****n
4 文章 | 2 粉丝
c****n
4文章数
2粉丝数
c****n
4 文章 | 2 粉丝
原创

rocketmq任意延迟消息实现原理

2023-03-28 03:09:37
383
0

前言

延迟消息就是生产者投递消息,希望其在指定的时间后才能被消费者所消费。
任意时间戳(精度到秒)延迟消息是rocketmq 5.0.0的功能,之前的版本都是只能指定固定延迟的级别(18个)
 

1.快速上手

5.0.0生产者

package org.apache.rocketmq.example.schedule;

import java.nio.charset.StandardCharsets;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class TimerMessageProducer {

    public static final String PRODUCER_GROUP = "TimerMessageProducerGroup";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TimerTopic";

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        // Launch producer
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));
            // This message will be delivered to consumer 10 seconds later.
            //message.setDelayTimeSec(10);
            // The effect is the same as the above
            // message.setDelayTimeMs(10_000L);
            // Set the specific delivery time, and the effect is the same as the above
            message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
            // Send the message
            SendResult result = producer.send(message);
            System.out.printf(result + "\n");
        }

        // Shutdown producer after use.
        producer.shutdown();
    }
}
从代码中我们可以看到,有3种方法指定延迟的时间,分别是setDelayTimeSec,setDelayTimeMs,setDeliverTimeMs,从方法名我们可以得知前两个方法是指延迟指定的时间后执行,只不过单位不同(一个是秒,一个毫秒),最后一个方法则是直接指定了可被消费的时间戳。
 

4.9.4生产者

package org.apache.rocketmq.example.schedule;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;

import java.nio.charset.StandardCharsets;

public class TimerMessageProducer {

    public static final String PRODUCER_GROUP = "TimerMessageProducerGroup";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TimerTopic";

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.setSendMsgTimeout(30000);

        // Launch producer
        long start = System.currentTimeMillis();
        producer.start();
        System.out.println("start the producer :" + (System.currentTimeMillis() - start));
        int totalMessagesToSend = 10000;
        for (int i = 0; i < totalMessagesToSend; i++) {
            try {
                Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));

                // 不升级客户端的前提下,直接设置属性  延迟10秒后
                // 1.延迟10s
                // message.putUserProperty("TIMER_DELAY_SEC", "10");
                // 2.延迟 10000ms
                // message.putUserProperty("TIMER_DELAY_MS", "10000");
                // 3.在当前时间 + 100000ms 的时刻执行
                message.putUserProperty("__STARTDELIVERTIME",String.valueOf(System.currentTimeMillis() + 10_000L));

                // Send the message
                SendResult result = producer.send(message);
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }

        }

        // Shutdown producer after use.
        producer.shutdown();
    }
}
 

消费者

package org.apache.rocketmq.example.schedule;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class TimerMessageConsumer {
    public static final String CONSUMER_GROUP = "TimerMessageConsumerGroup";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TimerTopic";

    public static void main(String[] args) throws Exception {
        // Instantiate message consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        // Subscribe topics
        consumer.subscribe(TOPIC, "*");
        // Register message listener
        consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
            for (MessageExt message : messages) {
                // Print approximate delay time period
                System.out.printf("Receive message[msgId=%s %d  ms later]\n", message.getMsgId(),
                    System.currentTimeMillis() - message.getBornTimestamp());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // Launch consumer
        consumer.start();
        //info:to see the time effect, run the consumer first , it will wait for the msg
        //then start the producer
    }
}
从代码中我们可以得知,对于延迟消息来说,消费者并不需要配置啥,代码只是验证了延迟消息的准确性而已。客户端没有版本限制,4.9.4与5.0.0保持一致.
 

阿里云SDK

生产者
package org.apache.rocketmq.example.schedule;


import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class AliYunTimerProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        //设置发送超时时间,单位毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "127.0.0.1:9876");

        Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();

        Message msg = new Message(
                // Message所属的Topic。
                "TopicTestMQ",
                // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                "TagA",
                // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());

        // 设置代表消息的业务关键属性,请尽可能全局唯一。以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发。
        msg.setKey("ORDERID_100");

        msg.setStartDeliverTime(System.currentTimeMillis() + 10_000L);
        // 异步发送消息,发送结果通过callback返回给客户端。
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                // 消息发送成功。
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });

        // 在callback返回之前即可取得msgId。
        System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

        // 在应用退出前,销毁Producer对象。 注意:如果不销毁也没有问题。
        producer.shutdown();
    }
}
 
 
消费者
package org.apache.rocketmq.example.schedule;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class AliyunConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制台创建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "127.0.0.1:9876");
        // 集群订阅方式(默认)。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 广播订阅方式。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                System.out.printf("Receive message[msgId=%s %d  ms later]\n", message.getMsgID(),
                        System.currentTimeMillis() - message.getBornTimestamp());
                return Action.CommitMessage;
            }
        });

        //订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

 

小结

可见,延迟消息使用上是简单的,旧版本的客户端也是手动配置相关属性实现兼容,可兼容阿里云rocketmq的java sdk。
 
 

2.实现原理

Rocketmq的架构中,producer和consumer都是客户端,nameserver提供服务发现和管理的功能,真正数据处理在broker,我们一起来看看broker是如何支持任意时间维度延迟消息的。
 

1.初始化

BrokerController
1.初始化方法中初始化相关对象
if (messageStoreConfig.isTimerWheelEnable()) {
    this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
    TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
    this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
    this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
    this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
从代码中,我们可以看到TimerCheckpoint,TimerMetrics,TimerMessageStore几个关键的类,任意时间延迟消息的实现主要就是由这几个类实现。
 
2.通过匿名内部类的方式注册到putMessageHookList中,最终是由HookUtils.handleScheduleMessage执行。
putMessageHookList.add(new PutMessageHook() {
    @Override
    public String hookName() {
        return "handleScheduleMessage";
    }

    @Override
    public PutMessageResult executeBeforePutMessage(MessageExt msg) {
        if (msg instanceof MessageExtBrokerInner) {
            return HookUtils.handleScheduleMessage(BrokerController.this, (MessageExtBrokerInner) msg);
        }
        return null;
    }
});
 

2.消息写入过程中的处理

 

3.broker内部处理

4.关键数据结构解析

TimerLog

UNIT_SIZE
prev pos
 
magic value
 
curr write time, for trace
delayed time, for check
 
offsetPy
sizePy
 
hash code of real topic
reserved value, just in case of
4(默认52)
 
同一个槽(同一秒)前一个timerlog记录的offset
3种情况
1.普通消息
2.回滚
3.删除
写入文件的时间
 
距离写入时间的延迟时间
原消息在commitlog中的物理offset
 
原消息在commitlog中的长度
 
真实topic的hashcode值
 
8(默认0)
4(默认52)
8
4
8
4
8
4
4
8(默认0)
可见,TimerLog 跟commitLog的设计都是一样的,消息时混合写入的,不按时间区分。
 

TimerWheel

是用mmap技术将文件映射成一个 7 * 24 * 3600 * 2 * 32 的数组,用于存放Slot的数据(并没有直接的关系,而且通过代码映射起来)
能支持最长的14天定时任务(实际上还有其他地方限制,所以默认最多设置的是3天内的任意延迟消息)
 

 

 

Slot

timeMs
firstPos
lastPos
num
magic
8
8
8
4
4
执行的时间戳
该槽第一个timerLog记录的offset
该槽最后一个timerLog记录的offset
该槽timerlog数量
目前没用
 

3.使用限制

1.定时消息的精度会有1s~2s的延迟误差。
2.定时和延时消息的TIMER_DELIVER_MS参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
3.定时和延时消息的TIMER_DELIVER_MS参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败(TIMER_DELAY_SEC < 40*24*3600,TIMER_DELAY_MS<40*24*3600*1000)。
4.StartDeliverTime是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
5.由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。
6.设置定时和延时消息的投递时间后,依然受7天的消息保存时长限制(与线上环境消息保存时长保持一致)。
例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第12天被删除。
7.任意延迟消息的broker处理进度会持久化,重启过后会把继续处理尚未处理但是超时的数据,保持消息可以被消费,就是精度会受影响。
 

使用建议

避免大量相同定时时刻的消息,一个broker节点不能超过10w。
定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
 
 
 
 
 

 

 

 

文章来自个人专栏
rocketmq专栏
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0