前言
延迟消息就是生产者投递消息,希望其在指定的时间后才能被消费者所消费。
任意时间戳(精度到秒)延迟消息是rocketmq 5.0.0的功能,之前的版本都是只能指定固定延迟的级别(18个)
1.快速上手
5.0.0生产者
package org.apache.rocketmq.example.schedule;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class TimerMessageProducer {
public static final String PRODUCER_GROUP = "TimerMessageProducerGroup";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TimerTopic";
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// Launch producer
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));
// This message will be delivered to consumer 10 seconds later.
//message.setDelayTimeSec(10);
// The effect is the same as the above
// message.setDelayTimeMs(10_000L);
// Set the specific delivery time, and the effect is the same as the above
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
// Send the message
SendResult result = producer.send(message);
System.out.printf(result + "\n");
}
// Shutdown producer after use.
producer.shutdown();
}
}
从代码中我们可以看到,有3种方法指定延迟的时间,分别是setDelayTimeSec,setDelayTimeMs,setDeliverTimeMs,从方法名我们可以得知前两个方法是指延迟指定的时间后执行,只不过单位不同(一个是秒,一个毫秒),最后一个方法则是直接指定了可被消费的时间戳。
4.9.4生产者
package org.apache.rocketmq.example.schedule;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import java.nio.charset.StandardCharsets;
public class TimerMessageProducer {
public static final String PRODUCER_GROUP = "TimerMessageProducerGroup";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TimerTopic";
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.setSendMsgTimeout(30000);
// Launch producer
long start = System.currentTimeMillis();
producer.start();
System.out.println("start the producer :" + (System.currentTimeMillis() - start));
int totalMessagesToSend = 10000;
for (int i = 0; i < totalMessagesToSend; i++) {
try {
Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));
// 不升级客户端的前提下,直接设置属性 延迟10秒后
// 1.延迟10s
// message.putUserProperty("TIMER_DELAY_SEC", "10");
// 2.延迟 10000ms
// message.putUserProperty("TIMER_DELAY_MS", "10000");
// 3.在当前时间 + 100000ms 的时刻执行
message.putUserProperty("__STARTDELIVERTIME",String.valueOf(System.currentTimeMillis() + 10_000L));
// Send the message
SendResult result = producer.send(message);
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// Shutdown producer after use.
producer.shutdown();
}
}
消费者
package org.apache.rocketmq.example.schedule;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class TimerMessageConsumer {
public static final String CONSUMER_GROUP = "TimerMessageConsumerGroup";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TimerTopic";
public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// Subscribe topics
consumer.subscribe(TOPIC, "*");
// Register message listener
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.printf("Receive message[msgId=%s %d ms later]\n", message.getMsgId(),
System.currentTimeMillis() - message.getBornTimestamp());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// Launch consumer
consumer.start();
//info:to see the time effect, run the consumer first , it will wait for the msg
//then start the producer
}
}
从代码中我们可以得知,对于延迟消息来说,消费者并不需要配置啥,代码只是验证了延迟消息的准确性而已。客户端没有版本限制,4.9.4与5.0.0保持一致.
阿里云SDK
生产者
package org.apache.rocketmq.example.schedule;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class AliYunTimerProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
properties.put(PropertyKeyConst.SecretKey, "XXX");
//设置发送超时时间,单位毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "127.0.0.1:9876");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
Message msg = new Message(
// Message所属的Topic。
"TopicTestMQ",
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
// Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
msg.setStartDeliverTime(System.currentTimeMillis() + 10_000L);
// 异步发送消息,发送结果通过callback返回给客户端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消息发送成功。
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// 在callback返回之前即可取得msgId。
System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
// 在应用退出前,销毁Producer对象。 注意:如果不销毁也没有问题。
producer.shutdown();
}
}
消费者
package org.apache.rocketmq.example.schedule;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class AliyunConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "127.0.0.1:9876");
// 集群订阅方式(默认)。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 广播订阅方式。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
System.out.printf("Receive message[msgId=%s %d ms later]\n", message.getMsgID(),
System.currentTimeMillis() - message.getBornTimestamp());
return Action.CommitMessage;
}
});
//订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
小结
可见,延迟消息使用上是简单的,旧版本的客户端也是手动配置相关属性实现兼容,可兼容阿里云rocketmq的java sdk。
2.实现原理
Rocketmq的架构中,producer和consumer都是客户端,nameserver提供服务发现和管理的功能,真正数据处理在broker,我们一起来看看broker是如何支持任意时间维度延迟消息的。
1.初始化
BrokerController
1.初始化方法中初始化相关对象
if (messageStoreConfig.isTimerWheelEnable()) {
this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
从代码中,我们可以看到TimerCheckpoint,TimerMetrics,TimerMessageStore几个关键的类,任意时间延迟消息的实现主要就是由这几个类实现。
2.通过匿名内部类的方式注册到putMessageHookList中,最终是由HookUtils.handleScheduleMessage执行。
putMessageHookList.add(new PutMessageHook() {
@Override
public String hookName() {
return "handleScheduleMessage";
}
@Override
public PutMessageResult executeBeforePutMessage(MessageExt msg) {
if (msg instanceof MessageExtBrokerInner) {
return HookUtils.handleScheduleMessage(BrokerController.this, (MessageExtBrokerInner) msg);
}
return null;
}
});
2.消息写入过程中的处理
3.broker内部处理
4.关键数据结构解析
TimerLog
UNIT_SIZE
|
prev pos
|
magic value
|
curr write time, for trace
|
delayed time, for check
|
offsetPy
|
sizePy
|
hash code of real topic
|
reserved value, just in case of
|
4(默认52)
|
同一个槽(同一秒)前一个timerlog记录的offset
|
3种情况
1.普通消息
2.回滚
3.删除
|
写入文件的时间
|
距离写入时间的延迟时间
|
原消息在commitlog中的物理offset
|
原消息在commitlog中的长度
|
真实topic的hashcode值
|
8(默认0)
|
4(默认52)
|
8
|
4
|
8
|
4
|
8
|
4
|
4
|
8(默认0)
|
可见,TimerLog 跟commitLog的设计都是一样的,消息时混合写入的,不按时间区分。
TimerWheel
是用mmap技术将文件映射成一个 7 * 24 * 3600 * 2 * 32 的数组,用于存放Slot的数据(并没有直接的关系,而且通过代码映射起来)
能支持最长的14天定时任务(实际上还有其他地方限制,所以默认最多设置的是3天内的任意延迟消息)
Slot
timeMs
|
firstPos
|
lastPos
|
num
|
magic
|
8
|
8
|
8
|
4
|
4
|
执行的时间戳
|
该槽第一个timerLog记录的offset
|
该槽最后一个timerLog记录的offset
|
该槽timerlog数量
|
目前没用
|
3.使用限制
1.定时消息的精度会有1s~2s的延迟误差。
2.定时和延时消息的TIMER_DELIVER_MS参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
3.定时和延时消息的TIMER_DELIVER_MS参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败(TIMER_DELAY_SEC < 40*24*3600,TIMER_DELAY_MS<40*24*3600*1000)。
4.StartDeliverTime是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
5.由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。
6.设置定时和延时消息的投递时间后,依然受7天的消息保存时长限制(与线上环境消息保存时长保持一致)。
例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第12天被删除。
7.任意延迟消息的broker处理进度会持久化,重启过后会把继续处理尚未处理但是超时的数据,保持消息可以被消费,就是精度会受影响。
使用建议
避免大量相同定时时刻的消息,一个broker节点不能超过10w。
定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。