RabbitMQ07_SpringBoot与RabbitMQ的整合

x33g5p2x  于2021-12-19 转载在 其他  
字(6.8k)|赞(0)|评价(0)|浏览(737)

RabbitMQ07_SpringBoot与RabbitMQ的整合

  • 创建 Springboot 项目,选择 Spring WebSpring for RabbitMQ
  • Pom文件:
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • application.yml 配置文件:
spring:
  rabbitmq:
    host: 118.31.106.51
    port: 5672
    username: ems
    password: 123456
    virtual-host: /ems
  • 发送消息的测试类(注入 RabbitTemplate):
package com.blu.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.blu.RabbitmqSpringbootApplication;

@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@Test
	public void testHello() {
		rabbitTemplate.convertAndSend("hello","hello rabbitMQ!");
	}
	
	/** * work模型--公平消费 */
	@Test
	public void testWork() {
		for(int i=1;i<=20;i++) {
			rabbitTemplate.convertAndSend("work","work work!"+i);
		}
		
	}
	
	/** * fanout模型 参数1:交换机名称 参数2:routing key(无意义,填空字符串) */
	@Test
	public void testFanout() {
		rabbitTemplate.convertAndSend("logs","","fanout模型生产的消息");
	}
	
	/** * 订阅直连(Direct)模型,参数1:交换机名称,参数2:routing key */
	@Test
	public void testRoute() {
		rabbitTemplate.convertAndSend("directs","info","发送了info的key路由信息");
	}
	
	/** * 订阅(Topic)模型 */
	@Test
	public void testTopic() {
		rabbitTemplate.convertAndSend("topics","user.save","topic模型消息");
	}

}
  • HelloCustomer(Hello模型的消息消费者):

注意:

1. 消费者类需要加上 @Component 注解
2. 在类上添加 @RabbitListener 注解,然后用 @RabbitHandler 指定回调方法
3. 或者直接在回调方法上添加 @RabbitListener 注解
4. 回调方法需要定义一个String 参数用于接收消息
5. 用 queuesToDeclare 参数绑定队列,用 @Queue 注解定义队列
6. @Queue 注解的 value 指定队列名,durable(持久化)默认为 “true”,autoDelete 默认为 “true”,是否独占默认为 "false"

package com.blu.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/** * @Queue 注解默认定义的是持久化、非独占、非自动删除的队列 * @author BLU */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
public class HelloCustomer {
	
	@RabbitHandler
	public void helloReceive(String message) {
		System.out.println("message = "+ message);
	}
}
  • WorkConsumer(Work模型的消息消费者):
package com.blu.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkConsumer {

	@RabbitListener(queuesToDeclare = @Queue("work"))
	public void workReceive1(String message) {
		System.out.println("message1 = "+ message);
	}
	
	@RabbitListener(queuesToDeclare = @Queue("work"))
	public void workReceive2(String message) {
		System.out.println("message2 = "+ message);
	}
}

消息接收详情:

message2 = work work!2
message1 = work work!1
message1 = work work!3
message2 = work work!4
message1 = work work!5
message2 = work work!6
message1 = work work!7
message2 = work work!8
message1 = work work!9
message2 = work work!10
message2 = work work!12
message1 = work work!11
message2 = work work!14
message1 = work work!13
message2 = work work!16
message1 = work work!15
message2 = work work!18
message1 = work work!17
message1 = work work!19
message2 = work work!20
  • FanoutConsumer(fanout (广播)模型的消息消费者):
package com.blu.fanout;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutConsumer {

	@RabbitListener(bindings = { @QueueBinding(
			// 创建临时队列
			value = @Queue,
			// 绑定交换机,名称:logs,类型:fanout
			exchange = @Exchange(value = "logs", type = "fanout")) })
	public void fanoutReceive1(String message) {
		System.out.println("message1 = " + message);
	}

	@RabbitListener(bindings = { @QueueBinding(
			value = @Queue,
			exchange = @Exchange(value = "logs", type = "fanout")) })
	public void fanoutReceive2(String message) {
		System.out.println("message2 = " + message);
	}

}

消息接收详情:

message1 = fanout模型生产的消息
message2 = fanout模型生产的消息
  • RouteConsumer(订阅直连(Direct)模型的消息消费者):
package com.blu.route;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RouteConsumer {
	
	@RabbitListener(bindings = {
			@QueueBinding(
					value = @Queue,
					exchange = @Exchange(value="directs",type = "direct"),
					key = {"info","error","debug","warn"})
	})
	public void RouteReceive1(String message) {
		System.out.println("message1 = "+message);
	}
	
	@RabbitListener(bindings = {
			@QueueBinding(
					value = @Queue,
					exchange = @Exchange(value="directs",type = "direct"),
					key = {"error"})
	})
	public void RouteReceive2(String message) {
		System.out.println("message2 = "+message);
	}

}

消息接收详情:

message1 = 发送了info的key路由信息
  • TopicConsumer(订阅(Topic)模型的消息消费者):
package com.blu.topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {
	
	@RabbitListener(bindings = {
			@QueueBinding(value = @Queue,exchange = @Exchange(value = "topics",type = "topic"),key= {"user.*"})
	})
	public void TopicReceive1(String message) {
		System.out.println("message1 = "+message);
	}
	
	@RabbitListener(bindings = {
			@QueueBinding(value = @Queue,exchange = @Exchange(value = "topics",type = "topic"),key= {"order.#","product.*"})
	})
	public void TopicReceive2(String message) {
		System.out.println("message2 = "+message);
	}

}

消息接收详情:

message1 = topic模型消息
上一篇:Topic模型

相关文章