可参考这篇文章链接: https://www.jianshu.com/p/14ffe0f3db94.
1.docker上启动rabbitmq实例
2.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yl</groupId>
<artifactId>amqp</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>amqp</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.application.properties
# rabbitmq的配置
# 主机
spring.rabbitmq.host=192.168.244.135
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
4.Direct模式配置文件
package com.yl.amqp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
//定义队列
@Bean
Queue directQueue() {
return new Queue("yl-queue",true);
}
//定义Direct交换机
@Bean
DirectExchange directExchange() {
return new DirectExchange("yl-direct",true,false);
}
//将队列绑定到交换机上
@Bean
Binding directBiding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
}
}
5.监听消息队列
package com.yl.amqp.receicer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectReceiver {
// 监听接收消息
@RabbitListener(queues = "yl-queue")
public void handler(String msg) {
System.out.println(msg);
}
}
6.测试
1.Fanout模式配置
package com.yl.amqp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
Queue queueOne() {
return new Queue("queue-one",true);
}
@Bean
Queue queueTwo() {
return new Queue("queue-two",true);
}
// 定义fanout交换机
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("yl-fanout",true,false);
}
// 将队列绑定到交换机
@Bean
Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
// 将队列绑定到交换机
@Bean
Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}
2.监听消息
package com.yl.amqp.receicer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handler1(String msg) {
System.out.println("handle1()"+msg);
}
@RabbitListener(queues = "queue-two")
public void handle2(String msg) {
System.out.println("handle2()"+msg);
}
}
3.测试
1.Topic模式配置
package com.yl.amqp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean
Queue huawei() {
return new Queue("huawei",true);
}
@Bean
Queue apple() {
return new Queue("apple",true);
}
@Bean
Queue phone() {
return new Queue("phone",true);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("yl-topic",true,false);
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
}
@Bean
Binding appleBinding() {
return BindingBuilder.bind(apple()).to(topicExchange()).with("apple.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
}
}
2.监听消息
package com.yl.amqp.receicer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicReceiver {
@RabbitListener(queues = "huawei")
public void handle1(String msg) {
System.out.println("handle1()" + msg);
}
@RabbitListener(queues = "apple")
public void handle2(String msg) {
System.out.println("handle2()" + msg);
}
@RabbitListener(queues = "phone")
public void handle3(String msg) {
System.out.println("handle3()" + msg);
}
}
3.测试1
4.测试2
1.Headers模式的配置
package com.yl.amqp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeaderConfig {
@Bean
Queue ageQueue() {
return new Queue("queue-age",true);
}
@Bean
Queue nameQueue() {
return new Queue("queue-name",true);
}
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange("yl-headers",true,false);
}
@Bean
Binding ageBinding() {
Map<String,Object> map = new HashMap<>();
map.put("age",18);
return BindingBuilder.bind(ageQueue()).to(headersExchange()).whereAny(map).match();
}
@Bean
Binding nameBinding() {
return BindingBuilder.bind(nameQueue()).to(headersExchange()).where("name").exists();
}
}
2.监听消息
package com.yl.amqp.receicer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HeadersReceiver {
@RabbitListener(queues = "queue-age")
public void handle1(String msg) {
System.out.println("handle1(),queue-age:" + msg);
}
@RabbitListener(queues = "queue-name")
public void handle2(String msg) {
System.out.println("handle2(),queue-name:" + msg);
}
}
3.测试1
4.测试2
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_41359273/article/details/122870599
内容来源于网络,如有侵权,请联系作者删除!