<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Configuration
public class RabbitMQConfiguration {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
String rabbitmqHost = "127.0.0.1";
String rabbitmqPort = "5672";
String rabbitmqUsername = "guest";
String rabbitmqPassword = "guest";
String rabbitmqVirtualHost = "/";
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
connectionFactory.setVirtualHost(rabbitmqVirtualHost);
// connectionFactory.setPublisherReturns(true);//开启return模式
// connectionFactory.setPublisherConfirms(true);//开启confirm模式
return connectionFactory;
}
@Bean(name = "rabbitTemplate")
//必须是prototype类型
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置线程数
factory.setConcurrentConsumers(1);
//最大线程数
factory.setMaxConcurrentConsumers(1);
// //设置为手动确认MANUAL(手动),AUTO(自动);
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置prefetch
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
}
生产者发送消息给交换机,由交换机转发消息给队列,交换机可以转发给所有绑定它的队列,也可以转发给符合路由规则的队列,交换机本身不会存储消息,如果没有绑定任何队列,消息就会丢失
生产者生产10条消息,发送给FanoutExchange,空字符串""表示不需要指定路由键
消费者A监听队列A,消费者B、B2监听队列B
@RestController
public class FanoutExchangeController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/fanout")
public void fanout() {
for (int i = 1; i <= 10; i++) {
String message = "Hello World ..." + i;
rabbitTemplate.convertAndSend("fanout_exchange", "", message);
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
}
}
private int count1 = 1;
private int count2 = 1;
private int count3 = 1;
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "fanout_a", declare = "true"),
exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
)
})
public void receiveA(String message) throws InterruptedException {
System.out.println(" [ 消费者@A号 ] Received ==> '" + message + "'");
Thread.sleep(200);
System.out.println(" [ 消费者@A号 ] Dealt with:" + count1++);
}
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "fanout_b", declare = "true"),
exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
)
})
public void receiveB(String message) throws InterruptedException {
System.out.println(" [ 消费者@B号 ] Received ==> '" + message + "'");
Thread.sleep(200);
System.out.println(" [ 消费者@B号 ] Dealt with:" + count2++);
}
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "fanout_b", declare = "true"),
exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
)
})
public void receiveB2(String message) throws InterruptedException {
System.out.println(" [ 消费者@B2号 ] Received ==> '" + message + "'");
Thread.sleep(500);
System.out.println(" [ 消费者@B2号 ] Dealt with:" + count3++);
}
}
运行结果
生产者生产了10条消息发送给交换机,fanout交换机将消息分别转发给绑定它的队列A、B;
队列A、B拿到内容一样的10条消息,队列A只有一个消费者A,所以由消费者A处理所有消息
队列B有两个消费者B、B2,分配模式是按能力分配,所以消费者B处理的多
@RestController
public class DirectController {
@Autowired
private RabbitTemplate rabbitTemplate;
private int count1 = 1;
private int count2 = 1;
private int count3 = 1;
@GetMapping("/direct")
public void direct() {
for (int i = 1; i <= 30; i++) {
String message = "Hello World ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
if (i % 3 == 0) {
rabbitTemplate.convertAndSend("direct_exchange", "direct_a", message);
} else if (i % 3 == 1) {
rabbitTemplate.convertAndSend("direct_exchange", "direct_b", message);
} else {
rabbitTemplate.convertAndSend("direct_exchange", "direct_b2", message);
}
}
}
/**
* name:交换机名称
* durable:设置是否持久,设置为true则将Exchange存盘,即使服务器重启数据也不会丢失
* autoDelete:设置是否自动删除,当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange,简单来说也就是如果该Exchange没有和任何队列Queue绑定则删除
* internal:设置是否是RabbitMQ内部使用,默认false。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
* @param message
* @throws InterruptedException
*/
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "direct_a", declare = "true"),
exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
key = "direct_a"
)
})
public void receiveA(String message) throws InterruptedException {
System.out.println(" [ 消费者@A号 ] Received ==> '" + message + "'");
Thread.sleep(200);
System.out.println(" [ 消费者@A号 ] Dealt with:" + count1++);
}
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "direct_b", declare = "true"),
exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
key = "direct_b"
)
})
public void receiveB(String message) throws InterruptedException {
System.out.println(" [ 消费者@B号 ] Received ==> '" + message + "'");
Thread.sleep(200);
System.out.println(" [ 消费者@B号 ] Dealt with:" + count2++);
}
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "direct_b", declare = "true"),
exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
key = "direct_b2"
)
})
public void receiveB2(String message) throws InterruptedException {
System.out.println(" [ 消费者@B2号 ] Received ==> '" + message + "'");
Thread.sleep(500);
System.out.println(" [ 消费者@B2号 ] Dealt with:" + count3++);
}
}
运行结果
两个队列A、B,A有一个路由键,而B有两个路由键;
生产者生产30条消息发送给交换机,交换机根据路由键转发给队列,队列A分配10条消息,队列B分配20条消息;
同时队列D被两个消费者监听
@RestController
public class TopicController {
@Autowired
private RabbitTemplate rabbitTemplate;
private int count1 = 1;
private int count2 = 1;
private int count3 = 1;
@GetMapping("/topic")
public void topic() {
for (int i = 1; i <= 30; i++) {
String message = "Hello World ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
if (i % 3 == 0) {
// topic.yzm.key,可以匹配 topic.yzm.* 和 topic.#
rabbitTemplate.convertAndSend("topic_exchange", "topic.yzm.key", message);
} else if (i % 3 == 1) {
// topic.yzm.yzm,可以匹配 topic.yzm.* 、 topic.# 和 topic.*.yzm
rabbitTemplate.convertAndSend("topic_exchange", "topic.yzm.yzm", message);
} else {
// topic.key.yzm,可以匹配 topic.# 和 topic.*.yzm
rabbitTemplate.convertAndSend("topic_exchange", "topic.key.yzm", message);
}
}
}
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "topic_e", declare = "true"),
exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
key = "topic.yzm.*"
)
})
public void receiveE(String message) throws InterruptedException {
System.out.println(" [ 消费者@E号 ] Received ==> '" + message + "'");
Thread.sleep(200);
System.out.println(" [ 消费者@E号 ] Dealt with:" + count1++);
}
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "topic_f", declare = "true"),
exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
key = "topic.#"
)
})
public void receiveF(String message) throws InterruptedException {
System.out.println(" [ 消费者@F号 ] Received ==> '" + message + "'");
Thread.sleep(200);
System.out.println(" [ 消费者@F号 ] Dealt with:" + count2++);
}
@RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
value = @Queue(value = "topic_g", declare = "true"),
exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
key = "topic.*.yzm"
)
})
public void receiveG(String message) throws InterruptedException {
System.out.println(" [ 消费者@G号 ] Received ==> '" + message + "'");
Thread.sleep(200);
System.out.println(" [ 消费者@G号 ] Dealt with:" + count3++);
}
}
分析结果:
E、G二十条,F三十条
配置类
@Configuration
public class RabbitMQConfiguration {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
String rabbitmqHost = "127.0.0.1";
String rabbitmqPort = "5672";
String rabbitmqUsername = "guest";
String rabbitmqPassword = "guest";
String rabbitmqVirtualHost = "/";
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
connectionFactory.setVirtualHost(rabbitmqVirtualHost);
// connectionFactory.setPublisherReturns(true);//开启return模式
// connectionFactory.setPublisherConfirms(true);//开启confirm模式
return connectionFactory;
}
@Bean(name = "rabbitTemplate")
//必须是prototype类型
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置线程数
factory.setConcurrentConsumers(1);
//最大线程数
factory.setMaxConcurrentConsumers(1);
// //设置为手动确认MANUAL(手动),AUTO(自动);
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置prefetch
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
public static final String QUEUE_X = "headers_x";
public static final String QUEUE_Y = "headers_y";
public static final String HEADER_EXCHANGE = "headers_exchange";
@Bean
public Queue queueX() {
return new Queue(QUEUE_X);
}
@Bean
public Queue queueY() {
return new Queue(QUEUE_Y);
}
@Bean
public HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange(HEADER_EXCHANGE).build();
}
@Bean
public Binding bindingX() {
Map<String, Object> map = new HashMap<>();
map.put("key1", "value1");
map.put("name", "yzm");
// whereAll:表示完全匹配
return BindingBuilder.bind(queueX()).to(headersExchange()).whereAll(map).match();
}
@Bean
public Binding bindingY() {
Map<String, Object> map = new HashMap<>();
map.put("key2", "value2");
map.put("name", "yzm");
// whereAny:表示只要有一对键值对能匹配就可以
return BindingBuilder.bind(queueY()).to(headersExchange()).whereAny(map).match();
}
}
消息发送和接受
@RestController
public class HeadersController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/headers")
public void headers() {
String s = "Hello World";
System.out.println(" [ 生产者 ] Sent ==> '" + s + "'");
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("key1", "value1");
messageProperties.setHeader("name", "yzm");
Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("headers_exchange", "", message);
}
@GetMapping("/headers2")
public void headers2() {
String s = "Hello World";
System.out.println(" [ 生产者 ] Sent ==> '" + s + "'");
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("key3", "value3");
messageProperties.setHeader("name", "yzm");
Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("headers_exchange", "", message);
}
@RabbitListener(queues = RabbitMQConfiguration.QUEUE_X)
public void receiveX(String message) {
System.out.println(" [ 消费者@X号 ] Received ==> '" + message + "'");
}
@RabbitListener(queues = RabbitMQConfiguration.QUEUE_Y)
public void receiveY(String message) {
System.out.println(" [ 消费者@Y号 ] Received ==> '" + message + "'");
}
}
测试结果
发送请求:http://localhost:8080/headers
生产消息时,头部键值对有:“key1”=“value1"和"name”=“yzm”,跟X队列能完全匹配上,跟Y队列能匹配上其中一个,所以两个消费者都能消费到消息
发送请求:http://localhost:8080/headers2
只有Y队列能匹配上一个键值对
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_43296313/article/details/125277130
内容来源于网络,如有侵权,请联系作者删除!