创建实例时开启SASL_SSL访问,则数据加密传输,安全性更高。
DMS的RabbitMQ实例兼容开源协议,请参考RabbitMQ官网提供的不同语言的连接和使用向导:https://www.rabbitmq.com/getstarted.html。
本节以DMS提供的demo为例,介绍VPC内访问与使用RabbitMQ的方法,假设RabbitMQ客户端部署在弹性云主机上。
前提条件
- 参考创建RabbitMQ实例章节创建RabbitMQ示例,并记录创建时输入的用户名和密码。
- 创建完成后,单击实例名称,查看并记录实例详情中的“连接地址”。
- 已创建弹性云主机,并且弹性云主机的VPC、子网、安全组与RabbitMQ实例的VPC、子网、安全组保持一致。
- 已完成JDK安装及环境变量配置,具体操作请参考准备环境。
命令行模式连接实例
以下操作命令以Linux系统为例进行说明。
1、下载RabbitMQ-Tutorial-SSL.zip示例工程代码。
2、解压RabbitMQ-Tutorial-SSL.zip压缩包。
$ unzip RabbitMQ-Tutorial-SSL.zip
3、进入RabbitMQ-Tutorial-SSL目录,该目录下包含预编译好的jar文件。
$ cd RabbitMQ-Tutorial-SSL
4、运行生产消息示例。
$ java -cp .:rabbitmq-tutorial-sll.jar Send host port user password
其中,host表示RabbitMQ实例的连接地址,port为RabbitMQ实例的监听端口(默认为5671),user表示RabbitMQ用户名,password表示用户名对应的密码。
图1 生产消息示例
使用Ctrl+C命令退出。
5、运行消费消息示例。
$ java -cp .:rabbitmq-tutorial-sll.jar Recv host port user password
其中,host表示RabbitMQ实例的连接地址,port为RabbitMQ实例的监听端口(默认为5671),user表示RabbitMQ用户名,password表示用户名对应的密码。
图2 消费消息示例
如需停止消费使用Ctrl+C命令退出。
示例代码(Java)
连接实例并生产消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(password);
factory.useSslProtocol();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
连接实例并消费消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(password);
factory.useSslProtocol();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);