安全接入点(PLAIN、AMQPLAIN授权机制)
import com.rabbitmq.client.*;
import java.io.IOException;
public class RabbitmqAmqpDemo {
public static void main(String[] args) throws Exception {
String host = "192.168.0.0"; //安全接入点ip
Integer port = 5672; //安全接入点port
String username = "xxx"; //集群管理用户列表的用户名
String password = "xxx"; //集群管理用户列表的密码
String vhost = "/";
String exchangeName = "ex_test";
String queueName = "qu_test";
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, "test");
String message = "Hello Aop";
for (int i = 0; i < 10; i++) {
channel.basicPublish(exchangeName, "test", null, message.getBytes());
System.out.println("消息发送成功");
}
Channel consumeChannel = connection.createChannel();
Consumer consumer = new DefaultConsumer(consumeChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String messageGet = new String(body, "UTF-8");
if (messageGet.equals(message)) {
System.out.println("消息消费成功");
}
}
};
consumeChannel.setDefaultConsumer(consumer);
consumeChannel.basicConsume(queueName, false, consumer);
Thread.sleep(10000);
}
}
SSL接入点(EXTERNAL授权机制)
import com.rabbitmq.client.*;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
public class RabbitmqExternalDemo {
public static void main(String[] args) throws Exception {
String host = "192.168.0.0"; //SSL接入点ip
int port = 5671; //SSL接入点port
//以下2个ssl文件可通过控制台获取安装包, 具体的获取方式可以查看2.2.1接入步骤的第二小节
String ksFile = "D:\\tmp\\ssl\\client_rabbitmq_key.p12";
String tksFile = "D:\\tmp\\ssl\\truststore";
String vhost = "/";
String exchangeName = "ex_test";
String queueName = "qu_test";
char[] keyPassphrase = "W3zT_98Zz9Io".toCharArray();
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(new FileInputStream(ksFile), keyPassphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(ks, keyPassphrase);
char[] trustPassphrase = null;
trustPassphrase = "W3zT_98Zz9Io".toCharArray();
KeyStore tks = KeyStore.getInstance("JKS");
tks.load(new FileInputStream(tksFile), trustPassphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
SSLContext c = SSLContext.getInstance("tlsv1.2");
c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
connectionFactory.useSslProtocol(c);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, "test");
String message = "Hello Aop";
for (int i = 0; i < 10; i++) {
channel.basicPublish(exchangeName, "test", null, message.getBytes());
System.out.println("消息发送成功");
}
Channel consumeChannel = connection.createChannel();
Consumer consumer = new DefaultConsumer(consumeChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String messageGet = new String(body, "UTF-8");
if (messageGet.equals(message)) {
System.out.println("消息消费成功");
}
}
};
consumeChannel.setDefaultConsumer(consumer);
consumeChannel.basicConsume(queueName, false, consumer);
Thread.sleep(10000);
}
}