searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

springboot对接rabbitmq(topic)

2024-03-28 09:29:34
3
0

topic主题交换器,通过路由key模糊匹配

				/---------------\									 /------------\
producer ----->	|				| ------topic.info.routing.key-----> | info queue | -------channel-------> consumer
				|				| 									 \------------/
				|				| 
				|				| 								      /-------------\
producer -----> |	 exchange	| ------topic.error.routing.key-----> | error queue | -------channel-------> consumer
				|				| 									  \-------------/
				|				| 
				|				|                                       /---------------\
producer -----> |				| ------topic.warning.routing.key-----> | warning queue | ------channel-----> consumer
				\---------------/										\---------------/

一、maven引用

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

二、配置文件

# 自定义配置应用于topic交换器
mq:
  config:
   #自定义交换器名称
     exchange: log.topic
     queue:
        #自定义error、info、all队列名称
        errorName: topic.error.log
        infoName: topic.info.log
        allName: topic.all.log
        #自定义error、info、all路由键的名称
        routingInfoKey: topic.info.routing.key
        routingErrorKey: topic.error.routing.key

三、生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Sender {
    /**
     * spring整合的操作类
     * Message 发送的消息对象
     * void send(Message var1) throws AmqpException;
     * <p>
     * var1 路由键 Message 发送的消息对象
     * void send(String var1, Message var2) throws AmqpException;
     * <p>
     * var1 指定交换器名称 var2 路由键 Message 发送的消息对象
     * void send(String var1, String var2, Message var3) throws AmqpException;
     *
     * convertAndSend() 方法不需要指定MessageProperties属性即可发布
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${mq.config.queue.routingInfoKey}")
    private String routingInfoKey;
    @Value("${mq.config.queue.routingErrorKey}")
    private String routingErrorKey;
    @Value("${mq.config.exchange}")
    private String exchange;

    public void send(String msg) {
        //需要指定交换器和路由键就可以转发
        rabbitTemplate.convertAndSend(exchange, routingInfoKey, "info+"+msg);
        rabbitTemplate.convertAndSend(exchange, routingErrorKey,"error+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.order.routing.key","order+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.warn.routing.key", "warn+"+msg);
    }

}

四、消费者

 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 模糊匹配所有的数据队列,注意在配置路由key的时候是*代表阶段的配置,.不在匹配范围内
 * @RabbitListener 自定义监听事件
 * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
 * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
 * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
 * key  指定路由键
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.allName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.*.routing.*")
)
public class AllReceiver {

    /**
     * 设置监听方法
     *
     * @param msg
     * @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:all {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * error的消费端
 * @RabbitListener 自定义监听事件
 * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
 * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
 * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
 * key  指定路由键
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.errorName}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingErrorKey}")
)
public class ErrorReceiver {

    /**
     * 设置监听方法
     *  @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
     *
     * @param msg
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:error {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * info的消费端
 * @RabbitListener 自定义监听事件
 * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
 * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
 * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
 * key  指定路由键
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.infoName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingInfoKey}")
)
public class InfoReceiver {

    /**
     * 设置监听方法
     *
     * @param msg
     * @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:info {}", msg);
    }
}
0条评论
0 / 1000
cactusii
15文章数
0粉丝数
cactusii
15 文章 | 0 粉丝
原创

springboot对接rabbitmq(topic)

2024-03-28 09:29:34
3
0

topic主题交换器,通过路由key模糊匹配

				/---------------\									 /------------\
producer ----->	|				| ------topic.info.routing.key-----> | info queue | -------channel-------> consumer
				|				| 									 \------------/
				|				| 
				|				| 								      /-------------\
producer -----> |	 exchange	| ------topic.error.routing.key-----> | error queue | -------channel-------> consumer
				|				| 									  \-------------/
				|				| 
				|				|                                       /---------------\
producer -----> |				| ------topic.warning.routing.key-----> | warning queue | ------channel-----> consumer
				\---------------/										\---------------/

一、maven引用

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

二、配置文件

# 自定义配置应用于topic交换器
mq:
  config:
   #自定义交换器名称
     exchange: log.topic
     queue:
        #自定义error、info、all队列名称
        errorName: topic.error.log
        infoName: topic.info.log
        allName: topic.all.log
        #自定义error、info、all路由键的名称
        routingInfoKey: topic.info.routing.key
        routingErrorKey: topic.error.routing.key

三、生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Sender {
    /**
     * spring整合的操作类
     * Message 发送的消息对象
     * void send(Message var1) throws AmqpException;
     * <p>
     * var1 路由键 Message 发送的消息对象
     * void send(String var1, Message var2) throws AmqpException;
     * <p>
     * var1 指定交换器名称 var2 路由键 Message 发送的消息对象
     * void send(String var1, String var2, Message var3) throws AmqpException;
     *
     * convertAndSend() 方法不需要指定MessageProperties属性即可发布
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${mq.config.queue.routingInfoKey}")
    private String routingInfoKey;
    @Value("${mq.config.queue.routingErrorKey}")
    private String routingErrorKey;
    @Value("${mq.config.exchange}")
    private String exchange;

    public void send(String msg) {
        //需要指定交换器和路由键就可以转发
        rabbitTemplate.convertAndSend(exchange, routingInfoKey, "info+"+msg);
        rabbitTemplate.convertAndSend(exchange, routingErrorKey,"error+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.order.routing.key","order+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.warn.routing.key", "warn+"+msg);
    }

}

四、消费者

 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 模糊匹配所有的数据队列,注意在配置路由key的时候是*代表阶段的配置,.不在匹配范围内
 * @RabbitListener 自定义监听事件
 * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
 * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
 * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
 * key  指定路由键
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.allName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.*.routing.*")
)
public class AllReceiver {

    /**
     * 设置监听方法
     *
     * @param msg
     * @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:all {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * error的消费端
 * @RabbitListener 自定义监听事件
 * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
 * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
 * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
 * key  指定路由键
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.errorName}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingErrorKey}")
)
public class ErrorReceiver {

    /**
     * 设置监听方法
     *  @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
     *
     * @param msg
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:error {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * info的消费端
 * @RabbitListener 自定义监听事件
 * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
 * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
 * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
 * key  指定路由键
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.infoName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingInfoKey}")
)
public class InfoReceiver {

    /**
     * 设置监听方法
     *
     * @param msg
     * @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:info {}", msg);
    }
}
文章来自个人专栏
灾备平台
15 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0