简介
主题模式允许发送者根据主题发布消息,而订阅者可以订阅特定的主题。
在主题模式中,生产者发送的消息被发送到一个交换机(Exchange),该交换机根据消息的路由键(Routing Key)和绑定(Binding)规则将消息路由到一个或多个队列。消费者随后从队列中接收并消费这些消息。以下是主题模式的一些关键要点:
- 路由键的设计:路由键是由点(.)分隔的字符串,例如 "stock.usd.nyse"。这些字符串通常定义了消息的某些属性或分类。
- 通配符的使用:队列绑定时可以使用通配符 "*" 和 "#"。其中,星号可以替代一个单词,井号可以替代零个或多个单词。这增加了灵活性,允许使用模糊匹配来定义哪个队列应该接收具有特定路由键的消息。
- 消息的路由过程:当消息到达交换机时,交换机会查找所有绑定的队列,检查它们的绑定键,并确定哪些队列的绑定键与消息的路由键相匹配。匹配成功的队列会接收到消息。
- 灵活性和复杂性:与直接模式相比,主题模式提供了更大的灵活性,因为它允许基于多个标准进行路由。然而,这种灵活性也带来了额外的复杂性,因为需要正确设计路由键和绑定键以实现期望的路由行为。
- 消息的丢失风险:如果没有任何队列的绑定键与消息的路由键匹配,那么消息将会丢失。因此,正确配置交换机、队列的绑定以及路由键非常重要。
通配符
在主题模式中的通配符其实就像我们平时写的正则表达式,比如在消费者中使用 "stock.#"
- 星号(*):星号可以代替路由键中的一个单词。例如,如果有一个路由键为 "stock.usd.nyse" 的消息,那么绑定键 "stock.*.nyse" 或 "stock.usd.#" 都可以匹配到这个消息。星号可以匹配零个或多个单词,但不会跨越点(.)进行匹配。
- 井号(#):井号可以代替路由键中的零个或多个单词,且可以跨越点进行匹配。这意味着,如果有一个路由键为 "stock.usd.nyse" 的消息,那么绑定键 "stock.#" 将匹配到所有以 "stock" 开头的路由键,不论后面跟随什么单词。
生产者
在之前的模式中,我们使用的都是路由键RoutingKey,而主题模式中的是绑定键 BindingKey。主题模式可以看作是一种特殊的路由模式,它允许更复杂的路由策略,通过使用通配符 "*" 和 "#" 来实现模糊匹配。从而实现处理更加复杂的消息路由需求。
RoutingKey 主要用于生产者发布消息时定义消息的路由路径,而 BindingKey 用于定义交换机与队列之间的绑定关系。
class MyClass
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost"; //RabbitMQ服务在本地运行
factory.UserName = "guest"; //用户名
factory.Password = "guest"; //密码
//创建连接
using (var connection = factory.CreateConnection())
{
//创建通道
using (var channel = connection.CreateModel())
{
//声明了一个主题交换机(topic),命名为"hello"
channel.ExchangeDeclare("hello", "topic");
Console.WriteLine("生产者:请输入绑定key");
var bindingKey = Console.ReadLine();
string msg;
Console.WriteLine("请输入要发送的消息内容:");
while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
{
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish("hello", bindingKey, null, body); //开始传递
Console.WriteLine("已发送: {0}", msg);
}
}
}
}
}
消费者
class MyClass
{
static void Main(string[] args)
{
//创建连接工厂
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";
//创建连接
using (var connection = factory.CreateConnection())
{
//创建通道
using (var channel = connection.CreateModel())
{
//声明了一个交换机
channel.ExchangeDeclare("hello", "topic");
//声明一个新的队列,并将这个队列的名称赋值给变量 queueName
var queueName = channel.QueueDeclare().QueueName;
//从控制台获取一个绑定键
Console.WriteLine("消费者:请输入BindingKey");
var bindingKey = Console.ReadLine();
channel.QueueBind(queueName, "hello", bindingKey);
//事件的基本消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Thread.Sleep(1000);
Console.WriteLine("已接收: {0}", message);
};
channel.BasicConsume(queueName, true, consumer);
Console.ReadKey();
}
}
}
}
演示
还是老样子,我们将生产者和消费者都发布打包,分别运行三次,然后生产者分别发送一条消息,如下图。
#
用于匹配多个单词。因此com.#三条消息都收到了。
*
用于匹配一个单词,因为有两个生产者的第三个单词是2,所以*.*.2
的消费者收到了两条消息。
而只有一个生产者的第二个单词是B,所以*.B.*
的消费者只收到了一条消息