<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package com.yolo.springbootrabbitmqproducer.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitMQConfiguration {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
String rabbitmqHost = "127.0.0.1";
String rabbitmqPort = "5672";
String rabbitmqUsername = "guest";
String rabbitmqPassword = "guest";
String rabbitmqVirtualHost = "/";
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
connectionFactory.setVirtualHost(rabbitmqVirtualHost);
// connectionFactory.setPublisherReturns(true);//开启return模式
// connectionFactory.setPublisherConfirms(true);//开启confirm模式
return connectionFactory;
}
@Bean(name = "rabbitTemplate")
//必须是prototype类型
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(1); //设置线程数
factory.setMaxConcurrentConsumers(1); //最大线程数
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
configurer.configure(factory, connectionFactory);
return factory;
}
}
@RestController
public class ProducerTestOneController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
}
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive(String message) {
System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
}
}
Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量。
Unacked:表示待确认数量;队列分配消息给消费者时,给该条消息一个待确认状态,当消费者确认消息之后,队列才会移除该条消息。
Total:表示待消费数和待确认数的总和
访问:http://localhost:8080/send
这里采用的是自动ack机制
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive2(Message message) {
System.out.println(" [ 消费者@2 ] Received ==> '" + new String(message.getBody()) + "'");
}
重新启动,访问:localhost:8080/send
可以看到消息被平均消费了
队列的消息分配方式默认是平均分配,即第一条消息分配给一个消息者,第二条消息就分配给另一个消息者,以此类推…
上面示例有2个消费者监听,由于只是简单的打印语句,所以看不出有什么问题。
我进行修改一下,通过设置线程休眠时间来表示消费者处理消费的任务时间
@RestController
public class ProducerTestOneController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
}
}
private int count1=1;
private int count2=1;
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive(Message message) throws InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive2(Message message) throws InterruptedException {
Thread.sleep(1000);
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
}
}
现在就能很明显的看出,消费者1号很快地处理完消息后就处于空闲状态;而消费者2号却一直很忙碌。当消息数量成千上万的时候,由消费者2号处理的消息会堆积很多,达不到时效性。
针对这种问题,rabbitmq提供了一种解决方案。
设置prefetch参数=1,实现原理是:队列只会分配一条消息给对应的监听消费者,收到消费者的确认回复之后才会重新分配另一条消息。
这里需要每一个接受者指定containerFactory
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(1); //设置线程数
factory.setMaxConcurrentConsumers(1); //最大线程数
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
// 设置prefetch
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
1号处理了8条消息,2号2条,工作效率提高了不少
这里接受者不需要指定containerFactory
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
prefetch: 1
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_43296313/article/details/125273267
内容来源于网络,如有侵权,请联系作者删除!