分布式消息服务MQTT支持和云上消息产品进行消息互通,当前支持的云产品有分布式消息服务Kafka。如果您的云端应用需要回溯消费mqtt服务的历史消息或希望将您的mqtt消息使用kafka进行持久化供其他云上应用进行消费使用,您可以将分布式消息服务MQTT和分布式消息服务Kafka进行数据流转。
本文以Java SDK为例说明如何将mqtt消息流入云上kafka消息队列服务。
首先你需要创建一个mqtt实例。
然后你需要创建一个主题:
注意该主题对应的是您kafka云消息队列的主题,对于您的mqtt服务而言,该主题为一级父主题。您可以以该父主题发送任意具体多级主题的消息最终都会流转到kafka以您创建主题名字一致的主题中。
以java语言为例的示例代码:
依赖准备:
首先添加如下的maven依赖(具体版本请自行决定):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
向mqtt服务发送消息:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class PubMsgTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
public static void main(String[] args) {
String topic = "topic-1/a/b/c";
String content = "hello ctg-mqtt service";
int qos = 2;
// 填写mqtt云消息服务的接入点。
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
System.out.println("Connecting to broker: " + broker);
myClient.connect(connOpts);
System.out.println("Connected");
for (int i = 0; i < 10; i++) {
System.out.println("Publishing message: " + content);
运行结果:
Connecting to broker: tcp://localhost:1883
Connected
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Publishing message: hello ctg-mqtt service
Message published
Disconnected
向kafka对应主题订阅并消费从mqtt流转过去的消息:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SubMsgTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka broker address
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "my-group-1"); // Consumer group ID
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "topic-1";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value());
}
}
}
}
运行结果:
Received: {"clientId":"mqttx_a7ed9cde","topic":"topic-1/a/b/c","payload":"test","ts":1694798079027}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094006}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094010}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094011}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094012}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094013}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094015}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094017}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094018}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094021}
Received: {"clientId":"ctg-mqtt-client-test","topic":"topic-1/a/b/c","payload":"hello ctg-mqtt service","ts":1694798094022}