消息收发
更新时间 2026-03-31 15:45:05
最近更新时间: 2026-03-31 15:45:05
本文为您分布式消息服务MQTT消息收发内容。
会话机制
终端 clean session=true,断线后会话信息清除,再次上线后之前所有的订阅关系以及离线消息丢失。 clean session=false断开连接的情况下,MQTT Broker也会为断连客户端保存一个会话,默认2小时,超期未重连订购关系清除;对于clean session=false的客户端断线重连后可接收Qos>0的离线消息。对于客户端因网络等各种原因断线,需要加上重连和订购关系重新订购机制。
离线消息
对于clean session=false的客户端,在未超出会话失效期,断线重连后可接收Qos>0的离线消息。
系统主题
| 系统主题 | 说明 |
|---|---|
| mq2mqtt | 用于云端服务向终端发送消息。发往该主题消息会转发至MQTT Broker实现云端与移动端互通 |
| mqtt-device-connect | 设备上线主题,内容 {"clientid":客户端ID,"ts":上线时间戳 } |
| mqtt-device-disconnect | 设备下线主题,内容 {"clientid":客户端ID,"ts":下线时间戳 } |
SDK支持
分布式消息服务MQTT支持标准的MQTT协议,理论上适配所有的MQTT客户端SDK。
推荐对应的第三方 SDK 如下表:
| 语言/平台 | 推荐的第三方SDK |
|---|---|
| Java | Eclipse Paho SDK |
| iOS | MQTT-Client-Framework |
| Android | Eclipse Paho SDK |
| JavaScript | Eclipse Paho JavaScript |
| Python | Eclipse Paho Python SDK |
| C | Eclipse Paho C SDK |
| C# | Eclipse Paho C# SDK |
| Golang | Eclipse Paho Golang SDK |
| Node.js | MQTT-JS |
| PHP | Mosquitto-PHP |
生产消费消息
MQTT客户端收发
使用MQTT SDK接入终端连接地址进行消息生产消费。
生产消息代码示例:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class PubMsgTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
public static void main(String[] args) {
String topic = "topic-1/a/b/c";
String content = "hello ctg-mqtt service";
int qos = 2;
// 填写mqtt云消息服务的接入点。
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-pub-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
System.out.println("Connecting to broker: " + broker);
myClient.connect(connOpts);
System.out.println("Connected");
for (int i = 0; i < 10; i++) {
System.out.println("Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
myClient.publish(topic, message);
System.out.println("Message published");
}
myClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
// 打印详细的错误信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
接收消息代码示例:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SubMsgTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
static String topic = "topic-1/a/b/c";
static int qos = 2;
public static void main(String[] args) {
// 填写mqtt云消息服务的接入点。
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-sub-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = getMqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
myClient.connect(connOpts);
} catch (MqttException me) {
// 打印详细的错误信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException {
MqttClient myClient = new MqttClient(broker, clientId, persistence);
myClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connected to broker: " + broker);
try {
myClient.subscribe(topic, qos);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("message is :" + message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
return myClient;
}
}