Java客户端连接配置
更新时间 2025-06-16 11:59:53
最近更新时间: 2025-06-16 11:59:53
本文为您介绍分布式消息服务MQTT客户端连接配置。
引入依赖//版本号按需调整
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
连接mqtt云消息服务的示例代码如下:
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ConnTest {
// 填入您在mqtt控制台创建的ACL账号密码。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
// 是否使用tls加密传输
private static final Boolean isTls = true;
public static void main(String[] args) {
// 填写mqtt云消息服务的接入点。接入点分为tls以及非tls两种接入。tls接入格式为:ssl://{ip}:8085
String broker = "tcp://localhost:1883";
// 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。
String clientId = "ctg-mqtt-client-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
final MqttClient myClient = getMqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
// 设置心跳间隔(这里示例为2分钟)
connOpts.setKeepAliveInterval(120);
// 设置自动重连
connOpts.setAutomaticReconnect(true);
// 设置tls相关配置(可选)
// 目前暂未支持自动配置ssl证书,默认的ssl证书需要客户端进行默认证书信任。不影响正常的tls链路加密
if (isTls) {
SSLContext sslContext = SSLContext.getInstance("TLS");
// 默认信任服务端ssl证书
sslContext.init(null, new TrustManager[]{new X509TrustManager() {
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}
public void checkServerTrusted(X509Certificate[] chain, String authType) {
}
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}}, new SecureRandom());
// 可以按照自定义的方式进行ssl证书的主机名验证
connOpts.setHttpsHostnameVerificationEnabled(false);
connOpts.setSSLHostnameVerifier((hostname, session) -> true);
connOpts.setSocketFactory(sslContext.getSocketFactory());
}
System.out.println("Connecting to broker: " + broker);
myClient.connect(connOpts);
System.out.println("Connected");
// 这里编写您的消息收发逻辑
myClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
// 打印详细的错误信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException {
final MqttClient myClient = new MqttClient(broker, clientId, persistence);
myClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// 连接建立成功
}
@Override
public void connectionLost(Throwable cause) {
// 连接丢失,建议记录日志,做好监控
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 收到消息的回调,这里不要进行阻塞操作,以免卡住导致连接断开
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 成功发送消息到服务端
}
});
return myClient;
}
}