消息数据存储
终端消息数据按父topic存储至kafka队列,需先在控制台创建父topic;对未创建父topic的消息可正常收发,但不会存储至kafka队列。
Kafka存储内容格式:
{
"clientId": 设备clientId,
"topic": 主题,
"payload": 消息内容,
"ts": 发送的时间戳
}
会话机制
终端 clean session=true,断线后会话信息清除,再次上线后之前所有的订阅关系以及离线消息丢失。 clean session=false断开连接的情况下,MQTT Broker也会为断连客户端保存一个会话,默认2小时,超期未重连订购关系清除;对于clean session=false的客户端断线重连后可接收Qos>0的离线消息。对于客户端因网络等各种原因断线,需要加上重连和订购关系重新订购机制。
离线消息
对于clean session=false的客户端,在未超出会话失效期,断线重连后可接收Qos>0的离线消息。
系统主题
系统主题 | 说明 |
---|---|
mq2mqtt | 用于云端服务向终端发送消息。发往该主题消息会转发至MQTT Broker实现云端与移动端互通 |
mqtt-device-connect | 设备上线主题,内容 {"clientid":客户端ID,"ts":上线时间戳 } |
mqtt-device-disconnect | 设备下线主题,内容 {"clientid":客户端ID,"ts":下线时间戳 } |
SDK支持
分布式消息服务MQTT支持标准的MQTT协议,理论上适配所有的MQTT客户端SDK。
推荐对应的第三方 SDK 如下表:
语言/平台 | 推荐的第三方SDK |
---|---|
Java | Eclipse Paho SDK |
iOS | MQTT-Client-Framework |
Android | Eclipse Paho SDK |
JavaScript | Eclipse Paho JavaScript |
Python | Eclipse Paho Python SDK |
C | Eclipse Paho C SDK |
C# | Eclipse Paho C# SDK |
Golang | Eclipse Paho Golang SDK |
Node.js | MQTT-JS |
PHP | Mosquitto-PHP |
主题规则
主题形式:父topic/子级topic1/子级topic2…。(父topic需要先创建)
使用MQTT消息队列发消息,会把消息以父Topic主题分类保存在kafka上,应用服务可通过kafka 客户端以父Topic为主题消费消息。
云端应用服务统一发送到kafka topic为mq2mqtt的主题队列上,移动端topic、会话属性Qos和cleansession保存在Record Header中,MQTT设备通过订阅移动端topic,实现云端到移动端通讯。
Kakfa header与mqtt属性映射如下表:
Kafka Header | MQTT属性 |
---|---|
qoslevel | Qos |
cleanSession | cleanSession |
mqttTopic | 主题 |
生产消费消息
MQTT客户端收发
使用MQTT SDK接入终端连接地址进行消息生产消费。
生产消息代码示例:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class PubMsgTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
public static void main(String[] args) {
String topic = "topic-1/a/b/c";
String content = "hello ctg-mqtt service";
int qos = 2;
// 填写mqtt云消息服务的接入点。
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-pub-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
System.out.println("Connecting to broker: " + broker);
myClient.connect(connOpts);
System.out.println("Connected");
for (int i = 0; i < 10; i++) {
System.out.println("Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
myClient.publish(topic, message);
System.out.println("Message published");
}
myClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
// 打印详细的错误信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
接收消息代码示例:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SubMsgTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
static String topic = "topic-1/a/b/c";
static int qos = 2;
public static void main(String[] args) {
// 填写mqtt云消息服务的接入点。
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-sub-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = getMqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
myClient.connect(connOpts);
} catch (MqttException me) {
// 打印详细的错误信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException {
MqttClient myClient = new MqttClient(broker, clientId, persistence);
myClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connected to broker: " + broker);
try {
myClient.subscribe(topic, qos);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("message is :" + message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
return myClient;
}
}
MQTT发送Kafka接收
终端设备使用MQTT SDK接入终端连接地址进行消息发布,云端应用服务使用Kafka sdk接入
服务端连接地址按父主题进行数据消费。
MQTT发送顺序消息kafka接收顺序消息
创建父主题,类型分区顺序,父主题以orderMsg2mq-开头。
终端设备使用MQTT SDK接入终端连接地址进行消息发布,云端应用服务使用kafka sdk接入
服务端连接地址按父主题进行分区顺序数据消费
Kafka发送MQTT接收
云端应用服务使用kafka sdk接入服务端连接地址往系统主题mq2mqtt下发指令,终端设备使用MQTT SDK接入终端连接地址接收;
示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import java.util.Properties;
public class PubMsgTest {
public static void main(String[] args) {
Properties props = new Properties();
// 填入您在mqtt控制台查看到的kafka接入点信息。
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Object, String> producer = new KafkaProducer<>(props);
// 这里需要指定系统内置的流转到mqtt服务端的特殊kafka主题,一般命名为:mq2mqtt。
String topic = "mq2mqtt";
// kafka消息需要在header中指定需要发往mqtt订阅的主题以及一些会话属性、qos等级等信息。
RecordHeaders headers = new RecordHeaders();
headers.add(new RecordHeader("qosLevel", "2".getBytes()));
headers.add(new RecordHeader("cleanSession", "true".getBytes()));
// 这里需要指定您的mqtt客户端订阅的主题,支持topic filter。
headers.add(new RecordHeader("mqttTopic", "topic-1/a/b/c".getBytes()));
byte[] payload = new byte[1026 * 1024];
for (int i = 0; i < 1026 * 1024; i++) {
payload[i] = 'x';
}
try {
for (int i = 0; i < 1000; i++) {
String message = "Message- " + i + " " + new String(payload);
producer.send(new ProducerRecord<>(topic, null, null, null, message, headers));
// System.out.println("Sent: " + message);
System.out.println("sent successfully");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}