RabbitMQ路由模式_概念
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销 活动为了节约成本,只发布到站内信队列。此时需要使用路由模式 (Routing)完成这一需求。
特点:
1、每个队列绑定路由关键字RoutingKey。
2、生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模 式使用direct交换机。
实时效果反馈
1. RabbitMQ路由模式的特点为
A 能将消息发送给一条队列
B 能将消息发送给多条队列
C 能按照路由键将消息发送给指定队列
D 能按照通配符规则将消息发送给指定队列
RabbitMQ路由模式_编写生产者
接下来我们编写路由模式的生产者:
// 生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建交换机
channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
// 5.创建队列
channel.queueDeclare("SEND_MAIL2",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
channel.queueDeclare("SEND_STATION2",true,false,false,null);
// 6.交换机绑定队列
channel.queueBind("SEND_MAIL2","exchange_routing","import");
channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
channel.queueBind("SEND_STATION2","exchange_routing","import");
channel.queueBind("SEND_STATION2","exchange_routing","normal");
// 7.发送消息
channel.basicPublish("exchange_routing","import",null,
"双十一大促活动".getBytes());
channel.basicPublish("exchange_routing","normal",null,
"小心促销活动".getBytes());
// 8.关闭资源
channel.close();
connection.close();
}
}
实时效果反馈
1. RabbitMQ路由模式使用的交换机为
A direct
B fanout
C routing
D topic
2. 关于RabbitMQ路由模式,说法正确的是
A 队列需要绑定路由关键字
B 一个队列可以绑定多个路由关键字
C 交换机根据路由关键字将消息转发到指定队列
D 以上说法都正确
RabbitMQ路由模式_编写消费者
接下来我们编写路由模式的消费者:
// 站内信消费者
public class Customer_Station {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送站内信:"+message);
}
});
}
}
// 邮件消费者
public class Customer_Mail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL2",true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送邮件:"+message);
}
});
}
}
// 短信消费者
public class Customer_Message {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送短信:"+message);
}
});
}
}
运行生产者和消费者,效果如下:
RabbitMQ通配符模式_概念
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
1、消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
2、队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。
实时效果反馈
1. RabbitMQ通配符模式的特点为
A 能将消息发送给一条队列
B 能将消息发送给多条队列
C 能按照路由键将消息发送给指定队列
D 能按照通配符规则将消息发送给指定队列
2. 关于RabbitMQ通配符模式的匹配规则,说法正确的是
A # 可以匹配任意多个单词, * 可以匹配任意一个单词
B # 可以匹配任意一个单词, * 可以匹配任意多个单词
C # 和 * 都可以匹配任意多个单词
D # 和 * 都可以匹配任意一个单词
RabbitMQ通配符模式_编写生产者
接下来我们编写通配符模式的生产者:
// 生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建交换机
channel.exchangeDeclare("exchange_topic",BuiltinExchangeType.TOPIC,true);
// 5.创建队列
channel.queueDeclare("SEND_MAIL3",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
channel.queueDeclare("SEND_STATION3",true,false,false,null);
// 6.交换机绑定队列
channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
// 7.发送消息
channel.basicPublish("exchange_topic","mail.message.station",null,
"双十一大促活动".getBytes());
channel.basicPublish("exchange_topic","station",null,
"小型促销活动".getBytes());
// 8.关闭资源
channel.close();
connection.close();
}
}
实时效果反馈
1. RabbitMQ通配符模式发送消息时,消息的的特点为
A 由一个单词构成
B 由多个单词构成,中间以 . 分割
C 由多个单词构成,中间以 _ 分割
D 由多个单词构成,中间以 , 分割
2. RabbitMQ通配符模式使用的交换机为
A direct
B fanout
C routing
D topic
RabbitMQ通配符模式_编写消费者
接下来我们编写通配符模式的消费者:
// 站内信消费者
public class Customer_Station {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送站内信:"+message);
}
});
}
}
// 邮件消费者
public class Customer_Mail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL3",true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送邮件:"+message);
}
});
}
}
// 短信消费者
public class Customer_Message {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itxiaotong");
connectionFactory.setPassword("itxiaotong");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE3", true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送短信:"+message);
}
});
}
}