Java MqttServer应用的示例代码:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttServer {
public static void main(String[] args) {
String broker = "tcp://localhost:1883";
String clientId = "MqttServer";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true);
mqttClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used in this example
}
});
mqttClient.connect(connectOptions);
mqttClient.subscribe("mytopic");
System.out.println("MqttServer is running");
} catch (MqttException e) {
System.out.println("Error: " + e.getMessage());
}
}
}
这个示例代码使用了Eclipse Paho MQTT库来实现MqttServer。首先,创建了一个MqttClient
对象,使用指定的broker和client ID。然后,我们设置了连接选项,并为MqttClient
对象设置了回调函数。
回调函数由MqttCallback
接口定义,用于处理连接丢失、消息到达和消息投递完成等事件。在回调函数中,我们简单地打印出了相应的信息。
接下来,连接到broker并订阅名为"mytopic"的主题。最后,我们打印出"MqttServer is running"表示MqttServer已经开始运行。
使用Java的MqttClient类实现基本功能的示例代码:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttClientExample {
public static void main(String[] args) {
String broker = "tcp://:1883";
String clientId = "JavaMqttClient";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
String topic = "test/topic";
int qos = 1;
System.out.println("Subscribing to topic: " + topic);
client.subscribe(topic, qos);
System.out.println("Publishing message: Hello, MQTT!");
String message = "Hello, MQTT!";
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
client.publish(topic, mqttMessage);
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Received message: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered");
}
});
// Wait for some time to receive messages
Thread.sleep(5000);
client.disconnect();
System.out.println("Disconnected");
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
}
在上述示例代码中,首先创建一个MqttClient对象,并设置要连接的broker地址、客户端ID和持久化实例。接下来,设置连接选项并连接到broker。然后,订阅一个主题并发布一条消息。最后,我们设置一个MqttCallback对象来处理连接丢失、消息到达和消息传递完成的事件。最后,通过调用disconnect()方法来断开连接。