引入依赖//版本号按需调整
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
连接mqtt云消息服务的示例代码如下:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ConnTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
public static void main(String[] args) {
// 填写mqtt云消息服务的接入点。
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
final MqttClient myClient = new MqttClient(broker, clientId, persistence);
myClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// 连接建立成功
}
@Override
public void connectionLost(Throwable cause) {
// 连接丢失,建议记录日志,做好监控
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 收到消息的回调,这里不要进行阻塞操作,以免卡住导致连接断开
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 成功发送消息到服务端
}
});
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
// 设置心跳间隔(这里示例为2分钟)
connOpts.setKeepAliveInterval(120);
// 设置自动重连
connOpts.setAutomaticReconnect(true);
System.out.println("Connecting to broker: " + broker);
myClient.connect(connOpts);
System.out.println("Connected");
// 这里编写您的消息收发逻辑
myClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
// 打印详细的错误信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}