简介
🍀RabbitMQ中的工作队列模式是指将任务分配给多个消费者并行处理。在工作队列模式中,生产者将任务发送到RabbitMQ交换器,然后交换器将任务路由到一个或多个队列。消费者从队列中获取任务并进行处理。处理完成后,消费者可以向RabbitMQ发送一个确认消息,表示任务已完成。
优点:
🍀工作队列模式的主要优点是能够实现负载均衡和并行处理。通过将任务分配给多个消费者,可以提高系统的处理能力和吞吐量。此外,工作队列模式还具有很好的扩展性,可以根据需要动态添加或删除消费者。
任务流程:
- 生产者(Producer)将任务发送到RabbitMQ交换器(Exchange)。
- 交换器根据路由键(Routing Key)将任务路由到一个或多个队列(Queue)。
- 消费者(Consumer)从队列中获取任务并进行处理。
- 处理完成后,消费者向RabbitMQ发送一个确认消息,表示任务已完成。
生产者代码
在这个代码中我们声明消息队列时第二个参数设置为true,表示这个队列是持久化的。接着使用while做一个循环,不断读取用户输入的消息内容,然后将其转换为字节数组后发布到"hello"队列中。
class MyClass
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost"; //RabbitMQ服务在本地运行
factory.UserName = "guest"; //用户名
factory.Password = "guest"; //密码
//创建连接
using (var connection = factory.CreateConnection())
{
//创建通道
using (var channel = connection.CreateModel())
{
//声明一个名称为hello的消息队列
channel.QueueDeclare("hello", true, false, false, null);
string msg = null;
int i = 1;
Console.WriteLine("请输入要发送的消息内容:");
while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
{
string message = $"Hello {msg} ! " + i++; //传递的消息内容
var body = Encoding.UTF8.GetBytes(message);
//此处的参数"hello" 就对应的就是上面声明的消息队列的路由键
channel.BasicPublish("", "hello", null, body); //开始传递
Console.WriteLine("已发送: {0}", message);
}
}
}
}
}
消费者代码
🍀这里最关键的一行代码就是channel.BasicQos(0, 1, false);
BasicQos方法用于设置消费者的预取计数(prefetch count)。消费者从队列中获取消息的方式是通过预取计数来控制的。预取计数决定了消费者在没有发送确认信号的情况下可以同时处理多少条未确认的消息。
在Channel.BasicQos()方法中三个参数作用如下:
prefetchSize
:这个参数表示每次从队列中获取的消息的最大大小,单位是字节。设置为0表示没有限制。prefetchCount
:这个参数表示每个消费者同时可以处理的最大未确认消息的数量。设置为1表示每个消费者只能处理一个未确认消息。global
:这个布尔值表示是否将这两个参数应用于所有的消费者。如果设置为true
,则这两个参数将应用于所有的消费者;如果设置为false
,则这两个参数仅适用于当前的消费者。
channel.BasicQos(0, 1, false);
这行代码设置了消费者的预取计数为1。这意味着消费者在没有发送确认信号的情况下,最多只会处理一条未确认的消息。
这样可以提高消费者处理消息的效率,因为消费者不需要等待其他消费者发送确认信号后再处理消息。这样可以在一定程度上提高系统的吞吐量。
class MyClass
{
static void Main(string[] args)
{
//创建连接工厂
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";
//创建连接
using (var connection = factory.CreateConnection())
{
//创建通道
using (var channel = connection.CreateModel())
{
//声明队列
channel.QueueDeclare("hello", true, false, false, null);
channel.BasicQos(0, 1, false);
//事件的基本消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//这里加上睡眠时间,模拟耗时任务
Thread.Sleep(1000);
Console.WriteLine("已接收: {0}", message);
//发送消息确认信号(手动确认)
channel.BasicAck(ea.DeliveryTag,false);
};
//当 autoAck设置为true时,也就是自动确认模式,一旦消息队列将消息发送给消息消费者后,就会从内存中将这个消息删除。
//当autoAck设置为false时,也就是手动模式,如果此时的有一个消费者宕机,消息队列就会将这条消息继续发送给其他的消费者,这样数据在消息消费者集群的环境下,就不会不丢失了。
channel.BasicConsume("hello", false, consumer);
Console.ReadKey();
}
}
}
}
代码演示
🍀首先我们将消费者代码发布到本地文件夹中
🍀发布完成后我们找到打包好的程序集,双击两次.exe文件,运行两个消费者
🍀接着我们运行生产者代码,在控制台随意发送6条消息。
🍀再回到我们刚刚运行的两个消费者程序,可以看到, 消息被分发给两个消费者了