分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到40天。
定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。
发送定时消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
适用场景
定时消息适用于以下场景:
- 消息对应的业务逻辑有时间窗口要求,如电商交易中超时未支付关闭订单的场景。在订单创建时发送一条定时消息,5分钟以后投递给消费者,消费者收到此消息后需要判断对应订单是否完成支付,如果未完成支付,则关闭订单。如果已完成,则忽略。
- 通过消息触发定时任务的场景,如在某些固定时间点向用户发送提醒消息。
注意
定时消息的最大延迟时间为40天,延迟超过40天的消息将会发送失败。
定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
定时消息的精度有1s~2s的延迟误差
无法确保定时消息仅投递一次,定时消息可能会重复投递。
定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。
设置定时消息的投递时间后,依然受消息老化时间限制,默认消息过期时间为7天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第12天被删除。
定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.7。
通过以下任意一种方式引入依赖:
-
使用Maven方式引入依赖。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies>
-
点击下载依赖JAR包:rocketmq-all-4.9.7-bin-release.zip
发送定时/延时消息
发送定时/延时消息的示例代码如下:
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerTimerDelayExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
/*
* 创建Producer,如果想开启消息轨迹,可以按照如下方式创建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制台NAMESRV接入点地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("TopicTest",
"YOUR MESSAGE TAG",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
/*
* 发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。
*/
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
/*
* 若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2022-10-10 10:10:00投递。
* 定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。
* long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2022-10-10 10:10:00").getTime();
* msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
消费定时/延时消息
消费定时/延时消息的示例代码如下
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerDelayExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
/*
* 创建Consumer,如果想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制台NAMESRV接入点地址
consumer.setNamesrvAddr("192.168.0.1:9876");
// consumer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
/*
* 如果想要消费指定TAG的消息,可以按照如下方式订阅:* 为订阅所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}