<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Configuration
public class RabbitMQConfiguration {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
String rabbitmqHost = "127.0.0.1";
String rabbitmqPort = "5672";
String rabbitmqUsername = "guest";
String rabbitmqPassword = "guest";
String rabbitmqVirtualHost = "/";
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
connectionFactory.setVirtualHost(rabbitmqVirtualHost);
// connectionFactory.setPublisherReturns(true);//开启return模式
// connectionFactory.setPublisherConfirms(true);//开启confirm模式
return connectionFactory;
}
@Bean(name = "rabbitTemplate")
//必须是prototype类型
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置线程数
factory.setConcurrentConsumers(1);
//最大线程数
factory.setMaxConcurrentConsumers(1);
// //设置为手动确认MANUAL(手动),AUTO(自动);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置prefetch
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
}
一般来说,发布消息之后,会被交换机接收并转发给对应的队列,队列分配给消费者处理,这个过程很快秒级处理;但有时候我们希望发布完消息后,在指定的时间之后再去处理消息,这个时候就需要使用到延时队列;
虽说是延时队列,但其实也只是对死信队列的一种扩展应用罢了。
首先还是得创建普通队列,添加参数绑定死信队列同时设置消息过期时间,生产者发布消息到普通队列,而普通队列没有任何消费者来消费,那么消息在普通队列中存活到设定过期时间就被转发到死信队列,由死信队列的消费者消费消息,以此实现延时功能
@Slf4j
@RestController
public class DL2Controller {
@Autowired
private RabbitTemplate rabbitTemplate;
private int count = 1;
@GetMapping("/dl2")
public void dl() {
String message = "Hello World222222222!";
log.info(" [ 生产者 ] Sent ==> '" + message + "'");
rabbitTemplate.convertAndSend("normal_exchange2", "normal_key2", message);
}
// 监听 normal_queue 正常队列
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "normal_queue2",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
,arguments = {
@Argument(name = "x-dead-letter-exchange",value = "dlx_exchange2"), //指定一下死信交换机
@Argument(name = "x-dead-letter-routing-key",value = "dead_key2"), //指定死信交换机的路由key
@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
//,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
}
),
exchange = @Exchange(value = "normal_exchange2",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "normal_key2"
)
})
public void normal(Message message, Channel channel) throws IOException, InterruptedException {
log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
/*
* deliveryTag:该消息的index
* multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息
* requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
// 监听死信队列
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "dlx_queue2"),
exchange = @Exchange(value = "dlx_exchange2"),//Exchang的默认类型就是direct,所以type可以不写
key = "dead_key2"
)
})
public void dl(Message message, Channel channel) throws IOException {
log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
//打印完直接丢弃消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
这里我们开启手动ack,然后在普通队列中拒绝ack并重新返回队列,当消息在队列时间超过3s,就会进入延迟队列
package com.yolo.springbootrabbitmqproducer.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@RestController
public class DL3Controller {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/dl3")
public void dl() {
String s = "Hello World! 3333333333";
log.info(" [ 生产者 ] Sent ==> '" + s + "'");
//设置过期时间
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("12000");
Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("normal_exchange3", "normal_key3", message);
}
// 监听 normal_queue 正常队列
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "normal_queue3",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
,arguments = {
@Argument(name = "x-dead-letter-exchange",value = "dlx_exchange3"), //指定一下死信交换机
@Argument(name = "x-dead-letter-routing-key",value = "dead_key3"), //指定死信交换机的路由key
//@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
//,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
}
),
exchange = @Exchange(value = "normal_exchange3",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "normal_key3"
)
})
public void normal(Message message, Channel channel) throws IOException, InterruptedException {
/*
* deliveryTag:该消息的index
* multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息
* requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
// 监听死信队列
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "dlx_queue3"),
exchange = @Exchange(value = "dlx_exchange3"),//Exchang的默认类型就是direct,所以type可以不写
key = "dead_key3"
)
})
public void dl(Message message, Channel channel) throws IOException {
log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
//打印完直接丢弃消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_43296313/article/details/125282132
内容来源于网络,如有侵权,请联系作者删除!