本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送、异步发送、单向发送。
- 同步发送:同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
- 异步发送:异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
- 单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发。
- 收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.7。
通过以下任意一种方式引入依赖:
-
使用Maven方式引入依赖。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies>
-
点击下载依赖JAR包:rocketmq-all-4.9.7-bin-release.zip
同步发送
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
参考如下示例代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制台获取NAMESRV接入点地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
异步发送
消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
参考如下示例代码
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制台获取NAMESRV接入点地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); //如果需要开启SSL,请增加此行代码
producer.start();
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
Message msg = new Message("TopicTest",
"TagA", // 设置消息的TAG,若无可设置为空
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.println("send message success. msgId= " + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println("send message failed.");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
单向发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
参考如下示例代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制台获取NAMESRV接入点地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest",
"TagA", // 设置消息的TAG,若无可设置为空
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
订阅普通消息
参考如下示例代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 控制台角色控制创建的用户ID
"accessSecret" // 控制台角色控制创建的密钥
));
}
public static void main(String[] args) throws Exception {
/*
* 创建Consumer,如果想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制台NAMESRV接入点地址
consumer.setNamesrvAddr("192.168.0.1:9876");
// consumer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码
/*
* 如果想要消费指定TAG的消息,可以按照如下方式订阅:*为订阅所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}