顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者指定分区选择算法,保证需要按顺序消费的消息被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.7。
通过以下任意一种方式引入依赖:
-
使用Maven方式引入依赖。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies>
-
点击下载依赖JAR包:rocketmq-all-4.9.7-bin-release.zip
发送顺序消息
参考如下示例代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerFifoExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
/*
* 创建Producer,如果想开启消息轨迹,可以按照如下方式创建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制台NAMESRV接入点地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("TopicTest",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
// 选择适合自己的分区选择算法,保证同一个参数得到的结果相同。
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
注意上述代码中,相同id的消息需要保证顺序,不同id的消息不需要保证顺序,所以在分区选择算法中以“id/队列个数的余数”作为消息发送的队列。
订阅顺序消息
参考如下示例代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerFifoExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
/*
* 创建Consumer,如果想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制台NAMESRV接入点地址
consumer.setNamesrvAddr("192.168.0.1:9876");
// consumer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
/*
* 如果想要消费指定TAG的消息,可以按照如下方式订阅:* 为订阅所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}