前提条件
创建分布式消息服务RabbitMQ相应环境。
操作步骤
RabbitMQ是一个开源的消息队列中间件,支持生产者和消费者之间的异步通信。在上述资源准备完成后,接下来需要编译工程生产消费,主要分以下几个步骤:
1、编写生产者代码:使用编程语言编写一个生产者程序。该程序将连接到RabbitMQ服务器,并将消息发送到队列中。
2、编写消费者代码:同样使用编程语言编写一个消费者程序。该程序将连接到RabbitMQ服务器,并从队列中接收消息。
3、运行生产者和消费者:运行生产者程序,它将发送消息到队列中。然后运行消费者程序,它将从队列中接收并处理消息。
4、验证结果:检查生产者和消费者程序的输出,确保消息被正确发送和接收。
引入依赖
在使用RabbitMQ时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用RabbitMQ之前,请确保查阅官方文档以获取最新的依赖项和使用说明。
以Java编程语言为例,可以使用RabbitMQ的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.0</version>
</dependency>
可以通过下载JAR包来引入依赖。
绑定BindingKey
在RabbitMQ中,绑定键(Binding Key)是用于绑定交换机(Exchange)和队列(Queue)的关键字。当一个消息被发送到交换机时,交换机会根据绑定键将消息路由到相应的队列中。
绑定键是在创建绑定(Binding)时指定的,它定义了消息应该如何被路由到队列。绑定键通常与消息的属性或内容进行匹配,以确定消息应该发送到哪个队列。
绑定键可以具有不同的形式,取决于使用的交换机类型。以下是一些常见的绑定键形式:
-
接匹配(Direct Match):绑定键与消息的路由键(Routing Key)完全匹配时,消息会被路由到相应的队列。
-
通配符匹配(Wildcard Match):绑定键可以使用通配符进行模式匹配。常见的通配符有和#,其中表示匹配一个单词,#表示匹配零个或多个单词。
-
主题匹配(Topic Match):绑定键可以使用主题模式进行匹配。主题模式使用.分隔的单词,可以包含*和#通配符。例如,stock.#可以匹配stock.price、stock.quantity等。
绑定键的选择取决于你的需求和消息的路由策略。通过正确设置绑定键,你可以确保消息被正确地路由到相应的队列中,以便消费者进行处理。
代码示例:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqBindingKey {
private final static String EXCHANGE_NAME = "exchangeTest";
private final static String QUEUE_NAME = "helloMQ";
private final static String ROUTING_KEY = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机ip
factory.setHost("192.168.3.113");
// 设置amqp的端口号
factory.setPort(5672);
// 设置用户名密码
factory.setUsername("rabbitmq");
factory.setPassword("r@bb!tMQ#3333323");
// 设置Vhost,需要在控制台先创建
factory.setVirtualHost("vhost");
//基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 创建 ${QueueName}。Queue 可以在控制台创建,也可以用API创建
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// Queue 与 Exchange进行绑定,注册 BindingKeyTest
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
connection.close();
}
}
完成后,可以在实例列表的交换器选项卡和队列选项卡查看结果。
生产消息
生产者需要创建一个连接到RabbitMQ服务器,然后创建一个通道(Channel)来进行消息的发布。在发布消息之前,生产者通常需要先声明一个队列,以确保消息能够被正确地路由和接收。
一旦连接和通道建立完成,生产者可以使用basicPublish()方法将消息发布到指定的队列。在发布消息时,需要指定目标队列的名称、消息内容以及其他的属性。
发布消息后,RabbitMQ将会将消息存储在队列中,等待消费者来接收。消费者可以使用相同的客户端库来创建连接和通道,并使用basicConsume()方法来订阅队列并接收消息。一旦有消息到达队列,消费者就会收到消息并进行相应的处理。
通过使用RabbitMQ,生产者和消费者可以实现解耦,即它们可以独立地进行开发和部署。生产者可以按照自己的节奏和需求发布消息,而消费者可以根据自己的处理能力和负载来接收和处理消息。
代码示例:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitmqProducer {
// private final static String EXCHANGE_NAME = "exchangeTest";
private final static String QUEUE_NAME = "helloMQ";
// private final static String ROUTING_KEY = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机ip
factory.setHost("192.168.3.113");
// 设置amqp的端口号
factory.setPort(5672);
// 设置用户名密码
factory.setUsername("username");
factory.setPassword("password");
// 设置Vhost,需要在控制台先创建
factory.setVirtualHost("test");
//基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 发送方消息确认,channel.confirmSelect();
// 启用发送方事务机制,channel.txSelect();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 发送的消息
String message = "Hello rabbitMQ!_" + i;
// 往队列中发送一条消息,使用默认的交换器
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
// 使用自定义交换器,需要在管理台预先建好,并设置routing key
// channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
TimeUnit.MILLISECONDS.sleep(100);
}
//关闭频道和连接
channel.close();
connection.close();
}
}
消息发送后,可以进入控制台,在实例列表的队列选项卡查看消息发送状消息
消费消息
消费者需要创建一个连接到RabbitMQ服务器,然后创建一个通道(Channel)来进行消息的订阅。在订阅消息之前,消费者通常需要先声明一个队列,以确保能够正确地接收和处理消息。
一旦连接和通道建立完成,消费者可以使用basicConsume()方法来订阅指定的队列,并注册一个回调函数来处理接收到的消息。当有消息到达队列时,RabbitMQ会将消息推送给消费者,消费者的回调函数将被调用,从而可以对消息进行处理。
消费者可以根据自己的需求设置消息的确认机制。在默认情况下,消费者在接收到消息后,会自动向RabbitMQ发送一个确认(ack)消息,表示已成功接收并处理该消息。如果消费者在处理消息时发生错误,可以选择不发送确认消息,从而使消息重新进入队列,以便其他消费者重新处理。
通过使用RabbitMQ,消费者可以实现解耦,即它们可以独立地进行开发和部署。消费者可以根据自己的处理能力和负载来接收和处理消息,从而实现负载均衡和水平扩展。
代码示例:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RabbitmqConsumer {
//队列名称
private final static String QUEUE_NAME = "helloMQ";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机ip
factory.setHost("192.168.3.113");
//设置amqp的端口号
factory.setPort(5672);
//设置用户名密码
factory.setUsername("username");
factory.setPassword("password");
//设置Vhost,需要在控制台先创建
factory.setVirtualHost("test");
//基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
完成上述步骤后,可以在控制台查看消费者是否启动成功。
完成以上所有步骤后,就成功接入了RabbitMQ服务,可以用消息队列进行消息发送和订阅了。