RabbitMQ高级特性_消费端限流
之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
1、生产者批量发送消息
@Test
public void testSendBatch() {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
}
}
2、消费端配置限流机制
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itxiaotong
password: itxiaotong
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
prefetch: 5
3、消费者监听队列
@Component
public class QosConsumer{
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
// 1.获取消息
System.out.println(new String(message.getBody()));
// 2.模拟业务处理
Thread.sleep(3000);
// 3.签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
实时效果反馈
1. 在RabbitMQ中,使用消费端限流必须开启
A 确认模式
B 退回模式
C 手动签收消息
D 什么都不需要开启
RabbitMQ高级特性_利用限流实现不公平分发
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多。
使用方法如下:
1、生产者批量发送消息
@Test
public void testSendBatch() {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
}
}
2、消费端配置不公平分发
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itxiaotong
password: itxiaotong
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
prefetch: 1
3、编写两个消费者
@Component
public class UnfairConsumer {
// 消费者1
@RabbitListener(queues = "my_queue")
public void listenMessage1(Message message, Channel channel) throws Exception
{
//1.获取消息
System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
//2. 处理业务逻辑
Thread.sleep(500); // 消费者1处理快
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
// 消费者2
@RabbitListener(queues = "my_queue")
public void listenMessage2(Message message, Channel channel) throws Exception
{
//1.获取消息
System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
//2. 处理业务逻辑
Thread.sleep(3000);// 消费者2处理慢
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
实时效果反馈
1. 在RabbitMQ中,实现不公平分发需要
A 将消费端限流为0
B 将消费端限流为1
C 将消费端限流为2
D 将消费端限流为3
RabbitMQ高级特性_消息存活时间
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
设置队列所有消息存活时间
1、在创建队列时设置其存活时间:
@Configuration
public class RabbitConfig2 {
private final String EXCHANGE_NAME="my_topic_exchange2";
private final String QUEUE_NAME="my_queue2";
// 1.创建交换机
@Bean("bootExchange2")
public Exchange getExchange2(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.创建队列
@Bean("bootQueue2")
public Queue getMessageQueue2(){
return QueueBuilder
.durable(QUEUE_NAME)
.ttl(10000) //队列的每条消息存活10s
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
2、生产者批量生产消息,测试存活时间
@Test
public void testSendBatch2() throws InterruptedException {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);
}
}
实时效果反馈
1. 在RabbitMQ中,当消息到达存活时间后还没有被消费,则
A 会被自动消费
B 会被自动清除
C 会被持久化
D 没有任何效果
2. 以下关于RabbitMQ的说法,正确的是
A 只能对队列的所有消息设置存活时间
B 只能对某条消息设置存活时间
C 可以对队列的所有消息设置存活时间,也可以对某条消息设置 存活时间
D 不能设置存活时间
设置单条消息存活时间
@Test
public void testSendMessage() {
//设置消息属性
MessageProperties messageProperties = new MessageProperties();
//设置存活时间
messageProperties.setExpiration("10000");
// 创建消息对象
Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}
注意:
1 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。
@Test public void testSendMessage2() { for (int i = 0; i < 10; i++) { if (i == 5) { // 1.创建消息属性 MessageProperties messageProperties = new MessageProperties(); // 2.设置存活时间 messageProperties.setExpiration("10000"); // 3.创建消息对象 Message message = new Message(("send message..." +i).getBytes(),messageProperties); // 4.发送消息 rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message); } else { rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "sendmessage..." + i); } } }
在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。
实时效果反馈
1. 在RabbitMQ中,如果设置了单条消息的存活时间,也设置了队 列的存活时间
A 以单条消息的存活时间为准
B 以队列的存活时间为准
C 以时间短的为准
D 以时间长的为准
2. RabbitMQ消息过期后
A 消息会被立即移除
B 只有消息在队列顶端时,才会被立即移除。
C 只有消息不在队列顶端时,才会被立即移除。
D 以上说法都不对
RabbitMQ高级特性_优先级队列
假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。
优先级队列用法如下:
1、创建队列和交换机
@Configuration
public class RabbitConfig3 {
private final String EXCHANGE_NAME="priority_exchange";
private final String QUEUE_NAME="priority_queue";
// 1.创建交换机
@Bean(EXCHANGE_NAME)
public Exchange priorityExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.创建队列
@Bean(QUEUE_NAME)
public Queue priorityQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
.maxPriority(10)
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
2、编写生产者
@Test
public void testPriority() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// i为5时消息的优先级较高
MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(9);
Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
} else {
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
}
}
}
3、编写消费者
@Component
public class PriorityConsumer {
@RabbitListener(queues = "priority_queue")
public void listenMessage(Message message, Channel channel) throws Exception
{
//获取消息
System.out.println(new String(message.getBody()));
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
实时效果反馈
1. 在RabbitMQ中,消息的优先级数值越大
A 越先被消费
B 越后被消费
C 随机被消费
D 以上说法都不对