searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享

微服务架构-分布式消息中间件-082:RabbitMQ消息确认机制&公平队列&发布订阅实现原理

2022-06-30 07:55:49
25
0

082:RabbitMQ消息确认机制&公平队列&发布订阅实现原理

1 Rabbitmq上节课内容简单回顾

2 传统的队列存在那些缺陷

3 Rabbitmq消费者如何实现手动ack

4 Rabbitmq如何实现公平队列

5 如何保证消息中间件消息不丢失

6 如何开启Rabbitmq持久化功能

7 Rabbitmq发布订阅的实现原理

8 Rabbitmq实现发布订阅功能

1 Rabbitmq上节课内容简单回顾

课题内容
1.RabbitMQ如何保证消息不丢失?
2.RabbitMQ消息确认机制原理
3.RabbitMQ工作队列的原理
4.RabbitMQ发布订阅实现的原理

2 传统的队列存在那些缺陷

如果多个消费者消费能力不一样,均摊消费不公平。

在这里插入图片描述

如何实现公平消费:采用手动ack机制

3 Rabbitmq消费者如何实现手动ack

主动拉取:消费者和MQ服务器端第一次建立连接的时候;
主动推送:消费者已经和MQ服务端保持长连接了,只要生产者投递消息,MQ服务端会立即将消息转发给消费者。

RabbitMQ如何保证消息不丢失 使用消息确认机制+持久化技术

A.消费者确认收到消息机制

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

注:第二个参数值为false,代表关闭RabbitMQ的自动应答机制,改为手动应答。

在处理完消息时,返回应答状态,true表示为自动应答模式。

channel.basicAck(envelope.getDeliveryTag(), false);

4 Rabbitmq如何实现公平队列

公平队列(工作队列)实现原理:

Mq服务器端每次只会给消费者发送一条消息,如果消费者没有返回ack,就不会继续发送消息。如果消费者能够非常快速的告诉给MQ ack,说明该消费者处理时间更快。

在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息,必须手动ack确认之后才会继续发送。

channel.basicQos(1);

public class Consumer {

    private static final String QUEUE_NAME = "mayikt";

    // 业务操作等待时间

    private static int time = 2000;

 

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1.创建连接

        Connection connection = RabitMQConnection.getConnection();

        // 2.创建通道

        Channel channel = connection.createChannel();

        // MQ每次只会给消费者发送一条消息,必须返回ack之后才会继续发送消息给消费者

        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");

                System.out.println("消费消息msg:" + msg);

                try {

                    Thread.sleep(time);

                } catch (Exception e) {

 

                }

                // 手动发送消息告诉给mq服务器端  从队列删除该消息

                channel.basicAck(envelope.getDeliveryTag(), false);

            }

        };

        // 3.创建监听的消息 true表示自动签收,false表示必须手动ack

        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

 

}

运行结果:

在这里插入图片描述

 

5 如何保证消息中间件消息不丢失

如何保证消息不丢失?

     1.生产者 确保我们的生产者将消息投递到MQ成功;

消息确认机制(生产者将消息投递到MQ,MQ告诉生产者投递消息结果)

// 开启生产确认消息投递机制

channel.confirmSelect();

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

if (channel.waitForConfirms()) {

System.out.println(“生产者发送消息成功:” + msg);

}

如果开启了消息持久化的机制,必须消息持久化成功才会应答给生产者

     2.消费者 确保消费者消费消息成功 采用手动ack确认

     3.MQ服务器端 需要将数据持久化到硬盘

其他情况下:硬盘坏了、持久化的过程断电了
如何解决:最好通过表记录每次生产者投递的消息,如果长期没有被消费,手动的补偿消费。

如果在生产者投递消息失败的情况,在哪些场景?
1.MQ挂了
2.MQ拒绝接受消息(队列满了)
生产环境中投递消息失败就日志表记录下、采用手动补偿即可。

6 如何开启Rabbitmq持久化功能

B.Rabbitmq如何开启持久化的功能
1.默认的情况下mq服务器端(控制台)创建队列和交换机都是持久化的;
2.如果是代码创建的话,代码中设置 durable为true

控制台配置参数:

durable是否持久化 durable为持久化、Transient不持久化(rabbitmq重启队列消失)

autoDelete是否自动删除 当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除

7 Rabbitmq发布订阅的实现原理

RabitMQ发布订阅
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列对应多个不同的消费者。

在这里插入图片描述

核心思想:
一个生产者投递消息,可以被多个不同的队列实现消费;
实现原理:
多个不同的队列绑定相同交换机,生产者只需要将消息投递到交换机之后,再由交换机将消息转发到所有绑定的队列实现消费。

8 Rabbitmq实现发布订阅功能

在这里插入图片描述

控制台创建Virtual host下交换机fanout_exchange、队列consumerFanout_email、队列consumerFanout_sms

public class ExProducer {

    private static final String QUEUE_NAME = "mayikt";

    /**

     * 定义交换机的名称

     */

    private static final String EXCHANGE_NAME = "fanout_exchange";

 

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        System.out.println("生产者启动成功..");

        // 1.创建连接

        Connection connection = RabitMQConnection.getConnection();

        // 2.创建通道

        Channel channel = connection.createChannel();

        // 不需要关心队列,只关注交换机

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);

        String msg = "生产者群发消息";

        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

        channel.close();

        connection.close();

        // 如果交换机没有绑定队列,消息可能丢失

    }

 

}

public class SmsConsumer {

    /**

     * 定义短信队列

     */

    private static final String QUEUE_NAME = "consumerFanout_sms";

    /**

     * 定义交换机的名称

     */

    private static final String EXCHANGE_NAME = "fanout_exchange";

 

    public static void main(String[] args) throws IOException, TimeoutException {

        System.out.println("短信消费者...");

        // 创建我们的连接

        Connection connection = RabitMQConnection.getConnection();

        // 创建我们通道

        final Channel channel = connection.createChannel();

        // 关联队列消费者关联队列

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");

                System.out.println("短信消费者获取消息:" + msg);

            }

        };

        // 开始监听消息 自动签收

        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

 

    }

}

public class EmailConsumer {

    /**

     * 定义短信队列

     */

    private static final String QUEUE_NAME = "consumerFanout_email";

    /**

     * 定义交换机的名称

     */

    private static final String EXCHANGE_NAME = "fanout_exchange";

 

    public static void main(String[] args) throws IOException, TimeoutException {

        System.out.println("邮件消费者...");

        // 创建我们的连接

        Connection connection = RabitMQConnection.getConnection();

        // 创建我们通道

        final Channel channel = connection.createChannel();

        // 关联队列消费者关联队列

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");

                System.out.println("邮件消费者获取消息:" + msg);

            }

        };

        // 开始监听消息 自动签收

        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

 

    }

}

运行结果:

在这里插入图片描述

————————————————

版权声明:本文为CSDN博主「竞风之翼」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

 

原文链接:https://blog.csdn.net/u012425860/article/details/113998176

 

0条评论
0 / 1000
天翼云文档找茬小助手
41文章数
7粉丝数
天翼云文档找茬小助手
41 文章 | 7 粉丝

微服务架构-分布式消息中间件-082:RabbitMQ消息确认机制&公平队列&发布订阅实现原理

2022-06-30 07:55:49
25
0

082:RabbitMQ消息确认机制&公平队列&发布订阅实现原理

1 Rabbitmq上节课内容简单回顾

2 传统的队列存在那些缺陷

3 Rabbitmq消费者如何实现手动ack

4 Rabbitmq如何实现公平队列

5 如何保证消息中间件消息不丢失

6 如何开启Rabbitmq持久化功能

7 Rabbitmq发布订阅的实现原理

8 Rabbitmq实现发布订阅功能

1 Rabbitmq上节课内容简单回顾

课题内容
1.RabbitMQ如何保证消息不丢失?
2.RabbitMQ消息确认机制原理
3.RabbitMQ工作队列的原理
4.RabbitMQ发布订阅实现的原理

2 传统的队列存在那些缺陷

如果多个消费者消费能力不一样,均摊消费不公平。

在这里插入图片描述

如何实现公平消费:采用手动ack机制

3 Rabbitmq消费者如何实现手动ack

主动拉取:消费者和MQ服务器端第一次建立连接的时候;
主动推送:消费者已经和MQ服务端保持长连接了,只要生产者投递消息,MQ服务端会立即将消息转发给消费者。

RabbitMQ如何保证消息不丢失 使用消息确认机制+持久化技术

A.消费者确认收到消息机制

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

注:第二个参数值为false,代表关闭RabbitMQ的自动应答机制,改为手动应答。

在处理完消息时,返回应答状态,true表示为自动应答模式。

channel.basicAck(envelope.getDeliveryTag(), false);

4 Rabbitmq如何实现公平队列

公平队列(工作队列)实现原理:

Mq服务器端每次只会给消费者发送一条消息,如果消费者没有返回ack,就不会继续发送消息。如果消费者能够非常快速的告诉给MQ ack,说明该消费者处理时间更快。

在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息,必须手动ack确认之后才会继续发送。

channel.basicQos(1);

public class Consumer {

    private static final String QUEUE_NAME = "mayikt";

    // 业务操作等待时间

    private static int time = 2000;

 

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1.创建连接

        Connection connection = RabitMQConnection.getConnection();

        // 2.创建通道

        Channel channel = connection.createChannel();

        // MQ每次只会给消费者发送一条消息,必须返回ack之后才会继续发送消息给消费者

        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");

                System.out.println("消费消息msg:" + msg);

                try {

                    Thread.sleep(time);

                } catch (Exception e) {

 

                }

                // 手动发送消息告诉给mq服务器端  从队列删除该消息

                channel.basicAck(envelope.getDeliveryTag(), false);

            }

        };

        // 3.创建监听的消息 true表示自动签收,false表示必须手动ack

        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

 

}

运行结果:

在这里插入图片描述

 

5 如何保证消息中间件消息不丢失

如何保证消息不丢失?

     1.生产者 确保我们的生产者将消息投递到MQ成功;

消息确认机制(生产者将消息投递到MQ,MQ告诉生产者投递消息结果)

// 开启生产确认消息投递机制

channel.confirmSelect();

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

if (channel.waitForConfirms()) {

System.out.println(“生产者发送消息成功:” + msg);

}

如果开启了消息持久化的机制,必须消息持久化成功才会应答给生产者

     2.消费者 确保消费者消费消息成功 采用手动ack确认

     3.MQ服务器端 需要将数据持久化到硬盘

其他情况下:硬盘坏了、持久化的过程断电了
如何解决:最好通过表记录每次生产者投递的消息,如果长期没有被消费,手动的补偿消费。

如果在生产者投递消息失败的情况,在哪些场景?
1.MQ挂了
2.MQ拒绝接受消息(队列满了)
生产环境中投递消息失败就日志表记录下、采用手动补偿即可。

6 如何开启Rabbitmq持久化功能

B.Rabbitmq如何开启持久化的功能
1.默认的情况下mq服务器端(控制台)创建队列和交换机都是持久化的;
2.如果是代码创建的话,代码中设置 durable为true

控制台配置参数:

durable是否持久化 durable为持久化、Transient不持久化(rabbitmq重启队列消失)

autoDelete是否自动删除 当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除

7 Rabbitmq发布订阅的实现原理

RabitMQ发布订阅
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列对应多个不同的消费者。

在这里插入图片描述

核心思想:
一个生产者投递消息,可以被多个不同的队列实现消费;
实现原理:
多个不同的队列绑定相同交换机,生产者只需要将消息投递到交换机之后,再由交换机将消息转发到所有绑定的队列实现消费。

8 Rabbitmq实现发布订阅功能

在这里插入图片描述

控制台创建Virtual host下交换机fanout_exchange、队列consumerFanout_email、队列consumerFanout_sms

public class ExProducer {

    private static final String QUEUE_NAME = "mayikt";

    /**

     * 定义交换机的名称

     */

    private static final String EXCHANGE_NAME = "fanout_exchange";

 

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        System.out.println("生产者启动成功..");

        // 1.创建连接

        Connection connection = RabitMQConnection.getConnection();

        // 2.创建通道

        Channel channel = connection.createChannel();

        // 不需要关心队列,只关注交换机

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);

        String msg = "生产者群发消息";

        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

        channel.close();

        connection.close();

        // 如果交换机没有绑定队列,消息可能丢失

    }

 

}

public class SmsConsumer {

    /**

     * 定义短信队列

     */

    private static final String QUEUE_NAME = "consumerFanout_sms";

    /**

     * 定义交换机的名称

     */

    private static final String EXCHANGE_NAME = "fanout_exchange";

 

    public static void main(String[] args) throws IOException, TimeoutException {

        System.out.println("短信消费者...");

        // 创建我们的连接

        Connection connection = RabitMQConnection.getConnection();

        // 创建我们通道

        final Channel channel = connection.createChannel();

        // 关联队列消费者关联队列

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");

                System.out.println("短信消费者获取消息:" + msg);

            }

        };

        // 开始监听消息 自动签收

        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

 

    }

}

public class EmailConsumer {

    /**

     * 定义短信队列

     */

    private static final String QUEUE_NAME = "consumerFanout_email";

    /**

     * 定义交换机的名称

     */

    private static final String EXCHANGE_NAME = "fanout_exchange";

 

    public static void main(String[] args) throws IOException, TimeoutException {

        System.out.println("邮件消费者...");

        // 创建我们的连接

        Connection connection = RabitMQConnection.getConnection();

        // 创建我们通道

        final Channel channel = connection.createChannel();

        // 关联队列消费者关联队列

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");

                System.out.println("邮件消费者获取消息:" + msg);

            }

        };

        // 开始监听消息 自动签收

        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

 

    }

}

运行结果:

在这里插入图片描述

————————————————

版权声明:本文为CSDN博主「竞风之翼」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

 

原文链接:https://blog.csdn.net/u012425860/article/details/113998176

 

文章来自个人专栏
云知识的搬运工
224 文章 | 7 订阅
0条评论
0 / 1000
请输入你的评论
0
0