1. 创建springboot项目
2. 引入依赖
<!--引入RabbitMQ消息依赖包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3. 基础配置
3.1 application.properties
spring:
rabbitmq:
port:5672
host:192.168.91.128
username:lisi
password:lisi
3.2 启动类 Application
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
// Send send = (Send) context.getBean("send");
// ProducerWork send = (ProducerWork) context.getBean("producerWork");
// ProducerPS send = (ProducerPS) context.getBean("producerPS");
ProducerRout send =(ProducerRout) context.getBean("producerRout");
// ProducerTop send = (ProducerTop) context.getBean("producerTop");
send.send();
}
}
3.3 RabbitConfig 配置类
/*
* 配置类
* */
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue");
}
@Bean
public Queue queue2() {
return new Queue("springboot-queue");
}
/*
* 创建路由
* */
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
/*
* 创建三个队列,分别是 A,B,C
* */
@Bean
public Queue amessage() {
return new Queue("queueA");
}
@Bean
public Queue bmessage() {
return new Queue("queueB");
}
@Bean
public Queue cmessage() {
return new Queue("queueC");
}
/*
* 路由绑定队列
* */
@Bean
public Binding bindingExchangeA(Queue amessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(amessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeB(Queue bmessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bmessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeC(Queue cmessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cmessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeA(Queue amessage, DirectExchange directExchange) {
return BindingBuilder.bind(amessage).to(directExchange).with("key");
}
@Bean
public Binding bindingExchangeB(Queue bmessage, DirectExchange directExchange) {
return BindingBuilder.bind(bmessage).to(directExchange).with("key123");
}
@Bean
public Binding bindingExchangeC(Queue cmessage, DirectExchange directExchange) {
return BindingBuilder.bind(cmessage).to(directExchange).with("key");
}
@Bean
public Binding bindingExchangeA(Queue amessage, TopicExchange topicExchange) {
return BindingBuilder.bind(amessage).to(topicExchange).with("key.*");
}
@Bean
public Binding bindingExchangeB(Queue bmessage, TopicExchange topicExchange) {
return BindingBuilder.bind(bmessage).to(topicExchange).with("*.key");
}
@Bean
public Binding bindingExchangeC(Queue cmessage, TopicExchange topicExchange) {
return BindingBuilder.bind(cmessage).to(topicExchange).with("key.#");
}
}
3.4 生产者类 Product
public class Product implements Serializable {
private Long id;
private String productName;
private Integer status;
private Double price;
private String productDesc;
private String caption;
public Product() {
}
public Product(Long id, String productName) {
this.id = id;
this.productName = productName;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getProductDesc() {
return productDesc;
}
public void setProductDesc(String productDesc) {
this.productDesc = productDesc;
}
public String getCaption() {
return caption;
}
public void setCaption(String caption) {
this.caption = caption;
}
@Override
public String toString() {
return "Product{" +
"id=" + id +
"productName=" + productName +
'}';
}
}
4. 一对一发送(简单队列)
4.1 生产者
@Component
public class Send {
@Autowired
RabbitTemplate rabbitTemplate;
public void send() {
rabbitTemplate.convertAndSend("queue", "hello springboot and rabbit");
}
}
4.2 消费者
@Component
@RabbitListener(queues = "queue")
public class Recv {
@RabbitHandler
public void consume(String message){
System.out.println("message:"+message);
}
}
5. 一对多发送(工作队列)
5.1 生产者
@Component
public class ProducerWork {
@Autowired
AmqpTemplate amqpTemplate;
public void send() {
for (int i = 0; i < 20; i++) {
amqpTemplate.convertAndSend("springboot-queue", new Product(111l, "欧莱雅"));
}
}
}
5.2 消费者1
@Component
@RabbitListener(queues = "springboot-queue")
public class Comsuerw1 {
@RabbitHandler
public void consu(Product product) {
System.out.println("Comsuerw1 message:" + product);
}
}
5.3 消费者2
@Component
@RabbitListener(queues = "springboot-queue")
public class Comsuerw2 {
@RabbitHandler
public void consu(Product product) {
System.out.println("Comsuerw2 message:" + product);
}
}
6. Fanout Exchange(发布/订阅)
6.1 生产者
@Component
public class ProducerPS {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
//向fanoutExchange交换机发送消息,绑定的routing_key写任何字符都会被忽略(第二个参无效)
amqpTemplate.convertAndSend("fanoutExchange", "", "this is a message");
}
}
6.2 消费者1
@Component
@RabbitListener(queues = "queueA")
public class Comsuer1 {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("Comsuer1:" + msg);
}
}
6.3 消费者2
@Component
@RabbitListener(queues = "queueB")
public class Comsuer2 {
@RabbitHandler
public void process(String msg)throws Exception{
System.out.println("Comsuer2:" + msg);
}
}
6.4 消费者3
@Component
@RabbitListener(queues = "queueC")
public class Comsuer3 {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("Comsuer3:" + msg);
}
}
7. Direct Exchange(路由)
7.1 生产者
@Component
public class ProducerRout {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
//向fanoutExchange交换机发送消息,绑定的routing_key写任何字符都会被忽略(第二个参无效)
amqpTemplate.convertAndSend("directExchange","key","this is a message rout");
}
}
7.2 消费者1
@Component
@RabbitListener(queues = "queueA")
public class ComsuerRout {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("queueA:" + msg);
}
}
7.3 消费者2
@Component
@RabbitListener(queues = "queueB")
public class ComsuerRout2 {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("queueB:" + msg);
}
}
7.4 消费者3
@Component
@RabbitListener(queues = "queueC")
public class ComsuerRout3 {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("queueC:" + msg);
}
}
8. Topic Exchange(通匹符)
8.1 生产者
@Component
public class ProducerTop {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
//向fanoutExchange交换机发送消息,绑定的routing_key写任何字符都会被忽略(第二个参无效)
amqpTemplate.convertAndSend("topicExchange", "key.1.1.1", "this is a message top");
}
}
8.2 消费者1
@Component
@RabbitListener(queues = "queueA")
public class ComsuerTop {
@RabbitHandler
public void process(String msg)throws Exception{
System.out.println("ComsuerTop-queueA:" + msg);
}
}
8.3 消费者2
@Component
@RabbitListener(queues = "queueB")
public class ComsuerTop2 {
@RabbitHandler
public void process(String msg)throws Exception{
System.out.println("ComsuerTop-queueB:" + msg);
}
}
8.4 消费者3
@Component
@RabbitListener(queues = "queueC")
public class ComsuerTop3 {
@RabbitHandler
public void process(String msg)throws Exception{
System.out.println("ComsuerTop-queueC:" + msg);
}
}