082:RabbitMQ消息确认机制&公平队列&发布订阅实现原理
1 Rabbitmq上节课内容简单回顾
2 传统的队列存在那些缺陷
3 Rabbitmq消费者如何实现手动ack
4 Rabbitmq如何实现公平队列
5 如何保证消息中间件消息不丢失
6 如何开启Rabbitmq持久化功能
7 Rabbitmq发布订阅的实现原理
8 Rabbitmq实现发布订阅功能
1 Rabbitmq上节课内容简单回顾
课题内容
1.RabbitMQ如何保证消息不丢失?
2.RabbitMQ消息确认机制原理
3.RabbitMQ工作队列的原理
4.RabbitMQ发布订阅实现的原理
2 传统的队列存在那些缺陷
如果多个消费者消费能力不一样,均摊消费不公平。
如何实现公平消费:采用手动ack机制
3 Rabbitmq消费者如何实现手动ack
主动拉取:消费者和MQ服务器端第一次建立连接的时候;
主动推送:消费者已经和MQ服务端保持长连接了,只要生产者投递消息,MQ服务端会立即将消息转发给消费者。
RabbitMQ如何保证消息不丢失 使用消息确认机制+持久化技术
A.消费者确认收到消息机制
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
注:第二个参数值为false,代表关闭RabbitMQ的自动应答机制,改为手动应答。
在处理完消息时,返回应答状态,true表示为自动应答模式。
channel.basicAck(envelope.getDeliveryTag(), false);
4 Rabbitmq如何实现公平队列
公平队列(工作队列)实现原理:
Mq服务器端每次只会给消费者发送一条消息,如果消费者没有返回ack,就不会继续发送消息。如果消费者能够非常快速的告诉给MQ ack,说明该消费者处理时间更快。
在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息,必须手动ack确认之后才会继续发送。
channel.basicQos(1);
public class Consumer {
private static final String QUEUE_NAME = "mayikt";
// 业务操作等待时间
private static int time = 2000;
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接
Connection connection = RabitMQConnection.getConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// MQ每次只会给消费者发送一条消息,必须返回ack之后才会继续发送消息给消费者
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费消息msg:" + msg);
try {
Thread.sleep(time);
} catch (Exception e) {
}
// 手动发送消息告诉给mq服务器端 从队列删除该消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 3.创建监听的消息 true表示自动签收,false表示必须手动ack
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
运行结果:
5 如何保证消息中间件消息不丢失
如何保证消息不丢失?
1.生产者 确保我们的生产者将消息投递到MQ成功;
消息确认机制(生产者将消息投递到MQ,MQ告诉生产者投递消息结果)
// 开启生产确认消息投递机制
channel.confirmSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (channel.waitForConfirms()) {
System.out.println(“生产者发送消息成功:” + msg);
}
如果开启了消息持久化的机制,必须消息持久化成功才会应答给生产者
2.消费者 确保消费者消费消息成功 采用手动ack确认
3.MQ服务器端 需要将数据持久化到硬盘
其他情况下:硬盘坏了、持久化的过程断电了
如何解决:最好通过表记录每次生产者投递的消息,如果长期没有被消费,手动的补偿消费。
如果在生产者投递消息失败的情况,在哪些场景?
1.MQ挂了
2.MQ拒绝接受消息(队列满了)
生产环境中投递消息失败就日志表记录下、采用手动补偿即可。
6 如何开启Rabbitmq持久化功能
B.Rabbitmq如何开启持久化的功能
1.默认的情况下mq服务器端(控制台)创建队列和交换机都是持久化的;
2.如果是代码创建的话,代码中设置 durable为true
控制台配置参数:
durable是否持久化 durable为持久化、Transient不持久化(rabbitmq重启队列消失)
autoDelete是否自动删除 当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
7 Rabbitmq发布订阅的实现原理
RabitMQ发布订阅
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列对应多个不同的消费者。
核心思想:
一个生产者投递消息,可以被多个不同的队列实现消费;
实现原理:
多个不同的队列绑定相同交换机,生产者只需要将消息投递到交换机之后,再由交换机将消息转发到所有绑定的队列实现消费。
8 Rabbitmq实现发布订阅功能
控制台创建Virtual host下交换机fanout_exchange、队列consumerFanout_email、队列consumerFanout_sms
public class ExProducer {
private static final String QUEUE_NAME = "mayikt";
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
System.out.println("生产者启动成功..");
// 1.创建连接
Connection connection = RabitMQConnection.getConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 不需要关心队列,只关注交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
String msg = "生产者群发消息";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
// 如果交换机没有绑定队列,消息可能丢失
}
}
public class SmsConsumer {
/**
* 定义短信队列
*/
private static final String QUEUE_NAME = "consumerFanout_sms";
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者...");
// 创建我们的连接
Connection connection = RabitMQConnection.getConnection();
// 创建我们通道
final Channel channel = connection.createChannel();
// 关联队列消费者关联队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取消息:" + msg);
}
};
// 开始监听消息 自动签收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
public class EmailConsumer {
/**
* 定义短信队列
*/
private static final String QUEUE_NAME = "consumerFanout_email";
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者...");
// 创建我们的连接
Connection connection = RabitMQConnection.getConnection();
// 创建我们通道
final Channel channel = connection.createChannel();
// 关联队列消费者关联队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者获取消息:" + msg);
}
};
// 开始监听消息 自动签收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
运行结果:
————————————————
版权声明:本文为CSDN博主「竞风之翼」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u012425860/article/details/113998176