<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package com.yolo.springbootrabbitmqproducer.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitMQConfiguration {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
String rabbitmqHost = "127.0.0.1";
String rabbitmqPort = "5672";
String rabbitmqUsername = "guest";
String rabbitmqPassword = "guest";
String rabbitmqVirtualHost = "/";
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
connectionFactory.setVirtualHost(rabbitmqVirtualHost);
// connectionFactory.setPublisherReturns(true);//开启return模式
// connectionFactory.setPublisherConfirms(true);//开启confirm模式
return connectionFactory;
}
@Bean(name = "rabbitTemplate")
//必须是prototype类型
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置线程数
factory.setConcurrentConsumers(1);
//最大线程数
factory.setMaxConcurrentConsumers(1);
//设置为手动确认MANUAL(手动),AUTO(自动);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置prefetch
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
}
RabbitMQ默认的消息确认机制是:自动确认的
队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。
@RestController
public class AckSenderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
}
}
private int count1=1;
private int count2=1;
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "ack"
)
})
public void receive(Message message) throws InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "ack"
)
})
public void receive2(Message message) throws InterruptedException {
Thread.sleep(1000);
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
}
}
http://localhost:8080/ack
消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者
停止程序,发现2条未确认的消息会回到Ready里面等待重新消费
再次重启,再次消费2条消息,但仍未确认
访问/ack,再次发布消息,消息堆积
@RestController
public class AckSenderController {
@Autowired
private RabbitTemplate rabbitTemplate;
private int count1=1;
private int count2=1;
private int count3 = 1;
@GetMapping("/ack")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
}
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "ack"
)
})
public void receive(Message message, Channel channel) throws InterruptedException, IOException {
Thread.sleep(200);
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
// 确认消息
// 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
// 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "ack"
)
})
public void receive2(Message message,Channel channel,@Headers Map<String, Object> map) throws InterruptedException, IOException {
Thread.sleep(600);
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
// 确认消息
channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "ack"
)
})
public void receive3(Message message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws InterruptedException, IOException {
Thread.sleep(1000);
System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);
// 确认消息
channel.basicAck(deliveryTag, false);
}
}
手动确认通过调用方法实现
basicAck(long deliveryTag, boolean multiple)
deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)
multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
这里发现程序刚刚启动就全部消费完了
继续发布,还是消费完成
@RestController
public class AckSenderController {
@Autowired
private RabbitTemplate rabbitTemplate;
private int count1=1;
private int count2=1;
private int count3 = 1;
@GetMapping("/ack")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
}
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "ack"
)
})
public void receive4(
Message message, Channel channel) throws IOException, InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++);
// 拒绝消息方式一
// 第一个参数,交付标签
// 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
// 第三个参数,false表示直接丢弃消息,true表示重新排队
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// 拒绝消息方式二
// 第一个参数,交付标签
// 第二个参数,false表示直接丢弃消息,true表示重新排队
// 跟basicNack的区别就是始终只拒绝提供的交付标签
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停
停止程序后,队列仍然是10条消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
改成false,拒绝后直接丢弃
重启后:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_43296313/article/details/125274521
内容来源于网络,如有侵权,请联系作者删除!