您可以将云上应用kafka上发布的消息流转到mqtt消息服务中,即您可以在mqtt服务中订阅并消费来自云上kafka的消息。
具体操作步骤如下:
1、 开通mqtt实例
2、 在mqtt管理控制台中创建一个主题,注意该主题为您在mqtt客户端订阅主题的父级主题。
3、 在mqtt管理控制台查询云上kafka消息队列的内网接入信息:
4、 向kafka云消息队列发送消息,注意必须将发送投递至kafka的特殊内置队列(参考示例代码),以及加上必要的消息头部信息。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import java.util.Properties;
public class PubMsgTest {
public static void main(String[] args) {
Properties props = new Properties();
// 填入您在mqtt控制台查看到的kafka接入点信息。
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Object, String> producer = new KafkaProducer<>(props);
// 这里需要指定系统内置的流转到mqtt服务端的特殊kafka主题,一般命名为:mq2mqtt。
String topic = "mq2mqtt";
// kafka消息需要在header中指定需要发往mqtt订阅的主题以及一些会话属性、qos等级等信息。
RecordHeaders headers = new RecordHeaders();
headers.add(new RecordHeader("qosLevel", "2".getBytes()));
headers.add(new RecordHeader("cleanSession", "true".getBytes()));
// 这里需要指定您的mqtt客户端订阅的主题,支持topic filter。
headers.add(new RecordHeader("mqttTopic", "topic-1/a/b/c".getBytes()));
try {
for (int i = 0; i < 10; i++) {
String message = "Message- " + i;
producer.send(new ProducerRecord<>(topic, null, null, null, message, headers));
System.out.println("Sent: " + message);
这里必要的headers信息主要包括:
qosLevel: 消息投递到mqtt服务的qos等级
cleanSession: 消息发布的清除会话状态选项
mqttTopic: 消息发布的mqtt主题,需要跟您在mqtt订阅端订阅的主题对应。
5、 订阅mqtt服务并消费从kafka服务端流转过来的消息:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SubMsgTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
public static void main(String[] args) {
String topic = "topic-1/a/b/c";
int qos = 2;
// 填写mqtt云消息服务的接入点。
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = getMqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
myClient.connect(connOpts);
System.out.println("connected to broker: " + broker);
myClient.subscribe(topic, qos);
} catch (MqttException me) {
// 打印详细的错误信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
运行结果如下:
connected to broker: tcp://localhost:1883
message is :Message- 0
message is :Message- 1
message is :Message- 2
message is :Message- 3
message is :Message- 4
message is :Message- 5
message is :Message- 6
message is :Message- 7
message is :Message- 8
message is :Message- 9