炒鸡辣鸡说RabbitMQ
不求甚解
收取消息
@Service
@Slf4j
public class RabbitMqMessage {
private static final String EXCHANGE = "html";
private static final String CHECK_QUEUE = "check-queue";
@Autowired
private DoCheckTask doCheckTask;
@Bean
private Declarables declarables() {
Queue queue = new Queue(CHECK_QUEUE);
FanoutExchange exchange = new FanoutExchange(EXCHANGE);
return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange));
}
@RabbitListener(queues = CHECK_QUEUE)
public void receiverHtml(String urlContent) {
UrlContent urlContent1 = Json.toObject(urlContent, UrlContent.class);
doCheckTask.urlContentPriorityBlockingQueue.add(urlContent1);
}
}
发送消息
@Service
@Slf4j
@Configuration
public class RabbitMqMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 页面内容广播交换机
*/
private static final String HTML_EXCHANGE = "html";
@Bean
public Declarables declarables() {
FanoutExchange exchange = new FanoutExchange(HTML_EXCHANGE);
return new Declarables(exchange);
}
public void sendHtmlToMessageQueue(String urlContent) {
rabbitTemplate.convertAndSend(HTML_EXCHANGE, "", urlContent);
}
}
简单来说,我们只需要定义好消息队列的交换器和队列即可。交换器需要首先在你的管理页面中定义好。必不可少的是,我们在配置文件中定义好连接设置
spring.rabbitmq.host=
spring.rabbitmq.port=
spring.rabbitmq.username=
spring.rabbitmq.password=
当然,既然我们使用了消息队列的注解,必不可少的,需要在依赖中安装相关的包,这里使用maven来管理包,所以相关的依赖为
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
是不是很简单?如果你比较忙,只要能用就行,那么抄上面的代码稍微再修改一下,就能满足你的使用条件了。
还要干嘛?
再来看看这段代码,首先使用@Service注解,让springboot在启动的时候,spring容器会加载这个类,注册为一个服务,然后@Bean来绑定我们的队列和交换机。你可以暂时将消息队列想象成带有存储功能的三层路由器。三层路由器具有交换机的功能,通过计算机网络的知识我们知道,交换机的功能是限定在局域网中,那么使用同一个exchange都是同一个局域网的(虚拟局域网也是局域网),所以我们在定义哪些队列绑定哪些交换器的时候,也需要考虑业务消息的相关性,不能一股脑的全部放在同一个交换器下。
@Bean
private Declarables declarables() {
Queue queue = new Queue(CHECK_QUEUE); // 定义队列对象:从哪个队列中获取信息
FanoutExchange exchange = new FanoutExchange(EXCHANGE); // 定义交换器对象
return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange)); // 将两者绑定起来
}
这段代码,在项目启动的时候,@Bean会告诉spring容器去执行这段代码,并生成这样的Declarables,也就是绑定成功了。然后我们只需要定义监听这个队列的函数即可。
@RabbitListener(queues = CHECK_QUEUE)
public void receiverHtml(String urlContent) {
UrlContent urlContent1 = Json.toObject(urlContent, UrlContent.class);
doCheckTask.urlContentPriorityBlockingQueue.add(urlContent1);
}
这段代码就是绑定了监听队列CHECK_QUEUE,当监听到该队列中有数据时,receiverHtml方法就会被调用,然后数据被放到urlContent中。
发送消息部分也一样
@Bean
public Declarables declarables() {
FanoutExchange exchange = new FanoutExchange(HTML_EXCHANGE);
return new Declarables(exchange);
}
这里也是注册了一个交换器,为啥要在注册一次?如果你是在一个项目中,那么不需要再次声明该exchange。
public void sendHtmlToMessageQueue(String urlContent) {
rabbitTemplate.convertAndSend(HTML_EXCHANGE, "", urlContent);
}
发送消息到交换器,直接使用rabbitTemplate实现即可。
求甚解
了解交换器和队列
队列
AnonymousQueue:代表一个匿名的,非持久的,排他的,自动删除队列
Queue:默认的队列
交换器
DirectExchange:它会把消息路由到那些 BindingKey RoutingKey 完全匹配的队列中。
FanoutExchange:广播交换器:它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
TopicExchange:主题交换器
HeadersExchange:不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中 headers 属性进行匹配。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
CustomExchange:模糊匹配
AbstractExchange:抽象交换器,作为基础存在
如果队列特别多怎么办
集中控制,在官网的demo中,有一种方案是将全部队列和交换器注册在一起
一共四个文件,一个注册bean的文件叫RabbitMqConfig.java用于注册所有的Bean。然后一个ConstNames用于存放所有的名字。一个sender和一个receiver即可。
就像这样:
@Configuration
public class RabbitMqConfig {
@Bean
public Receiver receiver() {
return new Receiver();
}
@Bean
public DirectExchange exchange(){
return new DirectExchange(ConstNames.EXCHANGE);
}
@Bean
public Queue queue(){
return new Queue(ConstNames.QUEUE);
}
@Bean
public Binding bindingEvent(DirectExchange exchange,Queue queue){
return BindingBuilder.bind(queue).to(exchange).withQueueName();
}
@Bean
public xxxx xxxx(){
// 这里再定义一个Sender即可
}
}
// Receiver.java
public class Receiver {
@RabbitListener(queues = ConstNames.QUEUE)
public void receiveEvent(String in) {
System.out.println(in);
}
}
// 管理名字
public class ConstNames{
public static final String EXCHANGE = "exchange";
public static final String QUEUE = "queue";
}
这样就避免写大量重复的代码了,同时上面的内容也可以写个东西自动生成代码。