消息队列
消息(Message)是指在应用间传送的数据,带有某种信息的信号。消息机制的三大要点:消息队列、消息循环(分发)、消息处理。
消息队列(Message Queue,MQ)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到MQ中,消息消费者只管从MQ中取消息。发布者和消费者没有同步接口调用逻辑,即所谓解耦。
作用 & 优势:
业务解耦、异步(将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度)、削峰填谷(平滑消费消息)、流量控制、广播消息、最终一致性。
缺点:
- 降低系统可用性,增加系统组件;
- 增加系统复杂,比如:集群部署时的一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输;
消息队列的架构图
消息队列从逻辑角度而言,可以分为四大模块:生产者模块、存储模块、消费者模块和配置管理模块,每个模块均可以集群方式部署。
生产者模块:主要负责接收上游系统发送过来的数据。解决如何处理大量的连接请求,并且快速的处理发送过来的数据,将数据以适当的形式进行组装,然后发送给存储模块。
存储模块:主要负责存储生产者模块发送过来的数据,然后将存储的数据传递给消费者模块。
该模块需要解决如何序列化数据并存储,以及如何读取序列化的数据,该模块的性能对消息队列整体的性能影响较大。一方面,如果序列化及存储性能很高,那么接收数据的性能也会很高。另一方面,如果读取序列化数据的性能很高,那么发送的性能也会很高,所以,该模块决定消息队列的服务能力。
消费者模块:主要负责消费存储模块的数据。该模块主要解决的是消费逻辑,比如:是广播消费还是集群消费等,也可以简单理解是广播还是单播。
Config Server:主要作为注册中心,存储组件的元数据,以及维护发送和消费关系等。
如果进一步将各个模块分解的话,可以发现每一个模块都包含若干子模块。现分解如下:
生产者模块:主要有两个部分构成。一个是接收上游的数据;另一个是协议处理,负责将接收到的数据根据约定的协议进行解析,然后传递给存储模块。
存储模块:主要包括,数据接收处理、数据标识生成、数据写入和数据读取。该部分主要解决数据如何序列化及如何存储。
消费者模块:主要解决消费的逻辑处理,如:广播模式、集群模式和分组管理等。
消息中间件
消息中间件,Message Oriented Middleware,MOM,已知开源的有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ等,基本上大同小异,但是各自有着不同的应用场景和特点;Kafka注重的是消息的吞吐量,不保证消息存储的可靠性以及一致性,因此多用于日志系统数据的上报; RabbitMQ能保证消息可靠存储投递。
RabbitMQ
RabbitMQ,Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息。特点:
- 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发送应答、发布确认。
- 灵活路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由/转发消息的。对于典型的路由功能,RabbitMQ 已经提供一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定/组合在一起,也通过插件机制实现自己的 Exchange 。
- 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
- 高可用(Highly Available Queues) 支持跨机器集群,支持队列安全镜像备份,即队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
- 多种协议(Multi-protocol) 支持多种消息队列协议,比如 STOMP、MQTT 等。
- 多语言客户端(Many Clients)
- 管理界面(Management UI) 提供一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
- 扩展性:支持负载均衡,动态增减服务器简单方便。
- 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供消息跟踪机制。
- 权限管理:灵活的用户角色权限管理,Virtual Host 是权限控制的最小粒度。
- 插件机制(Plugin System) 提供许多插件,来从多方面进行扩展,也可以编写自己的插件。
安装
Windows安装
需要先安装erlang开发环境,下载exe程序otp_win64_19.2.exe,一直next到死。
添加环境变量:ERLANG_HOME=D:\Program Files\erlx.x.x
(erlang安装路径);
更新PATH:%ERLANG_HOME%\bin;
验证ErLang安装情况,输入命令:erl
然后下载Rabbit-server-3.6.12.exe,一路Next即可完成安装,自带安装成service的脚本。
目录介绍:
- ebin:一些调用的erlang程序
- etc:配置文件
- include:依赖文件
- sbin:一些bat脚本,用来运行、控制、管理rabbitmq
- rabbitmq-server.bat 以应用方式启动rabbitmq;
- rabbitmq-service.bat 以服务方式启动rabbitmq;
- rabbitmqctl.bat,rabbitmq管理工具;
- rabbitmq-plugins.bat 扩展插件管理工具
- plugins:插件
执行命令:rabbitmq-plugins enable rabbitmq_management
安装管理插件。
安装完成之后,登录管理页面http://127.0.0.1:15672/,默认账号和密码:guest/guest;
C:/Users/<user>/AppData/Roaming/RabbitMQ/log/rabbit@mywin-PC.log
查看 RabbitMQ 的日志信息。
端口:
- 4369 (epmd)
- 5672, 5671 (AMQP 0-9-1 and 1.0 without and with TLS)
- 25672,这个端口用于Erlang分布节点间和CLI工具沟通,是在动态范围内分配的(默认情况下限制在一个单独的端口,计算方法:AMQP端口+20000)。
- 15672 (rabbitmq web管理工具插件)
- 61613, 61614 (STOMP 插件)
- 1883, 8883 (if MQTT 插件)
Linux安装
Mac 安装
brew install rabbitmq
Max 系统的 brew 自动其所依赖的 Erlang。
常用命令
rabbitmq-server
运行rabbitmq-server -detached
命令来重启服务并后台运行。
rabbitmqctl
rabbitmqctl
,提供几乎 RabbitMQ 管理所需的一站式解决方案:
rabbitmqctl status
:查询 RabbitMQ 服务器的状态信息,命令输出服务器信息:如 RabbitMQ 和 Erlang 的版本、OS 名称、内存等;
rabbitmqctl reset
:重置 RabbitMQ 节点;
rabbitmqctl start_app
:启动 RabbitMQ 应用程序;
rabbitmqctl list_queues
:查看已声明的队列;
rabbitmqctl list_exchanges name type durable auto_delete
:查看交换器,附加参数,比如列出交换器的名称、类型、是否持久化、是否自动删除;
rabbitmqctl list_bindings
:查看绑定;
背景知识
在Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每个实例,而多个 Erlang 应用程序可以运行在同一个节点之上。节点之间可以进行本地通信(不管是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。
rabbitmqctl -n rabbit@server.example.com stop
:关闭整个 RabbitMQ 节点可以用参数 stop。默认情况下,不指定 -n
选项时,它会和本地节点通信并指示其干净的关闭;指定 -n(node)
选项后,可以指定关闭远程节点,node 默认名称是 rabbit@server;
rabbitmqctl stopapp
:只想关闭应用程序,同时保持 Erlang 节点运行,该命令将清除所有的队列。
概念
ConnectionFactory连接管理器:用户与MQ建立的连接管理器;
Channel信道:消息推送使用的通道;
Exchange交换机:用于接收、分配消息;
Queue队列:用于存储生产者的消息;
RoutingKey路由键:用于把生产者的数据分配(路由)到交换机上;
BingDingKey绑定建:用于把交换器的消息绑定到队列中。
三种广播模式
fanout:凡事绑定到此交换机和队列都可以接受消息;
direct:通过路由键和交换机决定唯一的队列可以接受消息;
topic:所有符合路由键所绑定的队列都可以接受消息。
TTL
TTL是MQ中一个消息或者队列的属性,表明一条消息或者队列中所有消息或者队列的最大存活时间,单位是毫秒。如果一条消息设置了TTL属性,或者进入了设置TTL的队列,如果这条消息在TTL内的时间未被消费则该条消息则变成死信,如果配置了消息的TTL和队列的TTL则较小的那个值会被使用。
设置TTL的2种方式:
- 创建队列时设置队列的
x-message-ttl
属性:
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
- 针对每条消息设置TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
channel.basicPublish(exchangeName, routingKey, mandatory, builder.build(), "msg body".getBytes());
区别:
如果设置队列的TTL,如果消息过期则被队列丢弃;而第二种即使消息过期也不会马上被丢弃, 因为消息是否过期是在即将投递到消费者之前被判定的。此外,如果不设置TTL则表示消息永远不会过期,消息过期则变成死信。
延迟队列
用来存放需要在指定时间被处理的元素的队列,延迟队列中的元素都是带时间属性(TTL)的
发布模式
- 单对单:单发送、单接收;
- 单对多:一个发送端,多个接收端,如分布式的任务派发;
- 发布订阅模式:
- 按路由规则发送接收:
- 主题:Exchange Type 为 topic,发送消息时需要指定交换机及 Routing Key,消费者的消息队列绑定到该交换机并匹配到 Routing Key 实现消息的订阅,订阅后则可接收消息。只有消费者将队列绑定到该交换机且指定的 Routing Key 符合匹配规则,才能收到消息。Routing Key 可以设置成通配符,如:
*或 #
(*表示匹配 Routing Key 中的某个单词,# 表示任意的 Routing Key 的消息都能被收到)。如果 Routing Key 由多个单词组成,则单词之间用 .
来分隔。 - RPC(即远程存储调用)
命名规范:
交换机名的命名建议:Ex{AppID}.{自定义 ExchangeName},队列名的命名建议:MQ{AppID}.{自定义 QueueName} 。
客户端
Java 客户端
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
spring 集成 RabbitMQ
在常规的 spring 项目中添加配置文件 applicationContext-rabbitmq.xml:
<beans>
<!-- 连接工厂,提供channel,单点模式下只需配置rabbitmq server的host和port -->
<rabbit:connection-factory
id="connectionFactory"
addresses="${rabbitmq.addresses}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
publisher-confirms="true"
connection-timeout="1000"
cache-mode="CHANNEL"/>
<!-- 管理组件 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 存放消费失败消息的queue -->
<rabbit:queue name="alert.queue.dead"/>
<!-- 同步消息发送组件,使用jsonMessageConverter -->
<bean id="rabbitTemplate" class="com.demo.RabbitTemplate">
<constructor-arg ref="connectionFactory"/>
<property name="messageConverter" ref="jsonConverter"/>
<property name="retryTemplate" ref="retryTemplate"/>
<property name="recoveryCallback" ref="recoveryCallback"/>
</bean>
<!-- 异步消息发送组件,使用jsonMessageConverter -->
<bean id="asyncRabbitTemplate" class="com.demo.RabbitTemplate" lazy-init="true">
<constructor-arg ref="connectionFactory"/>
<property name="messageConverter" ref="jsonConverter"/>
<property name="retryTemplate" ref="retryTemplate"/>
<property name="recoveryCallback" ref="recoveryCallback"/>
<property name="sync" value="false"/>
<property name="taskExecutor" ref="rabbitProduceExecutor"/>
</bean>
<!-- 消息所有重发失败后的回调方法 -->
<bean id="recoveryCallback"
class="com.demo.RabbitRecoveryCallback"/>
<!-- 消费重试Advice -->
<bean id="retryAdvice" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
<property name="retryOperations" ref="retryTemplate"/>
</bean>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>
<!-- 使用json进行消息序列化和反序列化 -->
<bean id="jsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 重试策略 -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="50"/>
<property name="multiplier" value="10.0"/>
<property name="maxInterval" value="5000"/>
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="4"/>
</bean>
</property>
</bean>
<!--为了获得尽可能好的性能,此处参数配置需要经过反复测试得到-->
<!-- 异步消息发送线程池 -->
<bean id="rabbitProduceExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
lazy-init="true">
<property name="corePoolSize" value="3"/>
<property name="maxPoolSize" value="50"/>
<property name="queueCapacity" value="1000"/>
<property name="keepAliveSeconds" value="300"/>
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
</property>
</bean>
</beans>
spring boot 集成 RabbitMQ
可靠性
参考RabbitMQ消息可靠性