springboot:整合rabbitmq之延迟队列

x33g5p2x  于2022-06-16 转载在 Spring  
字(7.0k)|赞(0)|评价(0)|浏览(584)

springboot:整合rabbitmq之延迟队列

一、项目准备

依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置类

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        String rabbitmqHost = "127.0.0.1";
        String rabbitmqPort = "5672";
        String rabbitmqUsername = "guest";
        String rabbitmqPassword = "guest";
        String rabbitmqVirtualHost = "/";
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setVirtualHost(rabbitmqVirtualHost);
//        connectionFactory.setPublisherReturns(true);//开启return模式
//        connectionFactory.setPublisherConfirms(true);//开启confirm模式
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    //必须是prototype类型
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置线程数
        factory.setConcurrentConsumers(1);
        //最大线程数
        factory.setMaxConcurrentConsumers(1);

//        //设置为手动确认MANUAL(手动),AUTO(自动);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置prefetch
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

二、延迟队列

一般来说,发布消息之后,会被交换机接收并转发给对应的队列,队列分配给消费者处理,这个过程很快秒级处理;但有时候我们希望发布完消息后,在指定的时间之后再去处理消息,这个时候就需要使用到延时队列;
虽说是延时队列,但其实也只是对死信队列的一种扩展应用罢了。

三、案例(对queue所有消息进行设置)

创建生产者和消费者

首先还是得创建普通队列,添加参数绑定死信队列同时设置消息过期时间,生产者发布消息到普通队列,而普通队列没有任何消费者来消费,那么消息在普通队列中存活到设定过期时间就被转发到死信队列,由死信队列的消费者消费消息,以此实现延时功能

@Slf4j
@RestController
public class DL2Controller {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int count = 1;

    @GetMapping("/dl2")
    public void dl() {
        String message = "Hello World222222222!";
        log.info(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.convertAndSend("normal_exchange2", "normal_key2", message);
    }

    // 监听 normal_queue 正常队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "normal_queue2",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
                    ,arguments = {
                    @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange2"), //指定一下死信交换机
                    @Argument(name = "x-dead-letter-routing-key",value = "dead_key2"),  //指定死信交换机的路由key
                    @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
                    //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
            }
            ),
            exchange = @Exchange(value = "normal_exchange2",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "normal_key2"
    )
    })
    public void normal(Message message, Channel channel) throws IOException, InterruptedException {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        /*
         * deliveryTag:该消息的index
         * multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息
         * requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
         */
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }

    // 监听死信队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
                    value = @Queue(value = "dlx_queue2"),
                    exchange = @Exchange(value = "dlx_exchange2"),//Exchang的默认类型就是direct,所以type可以不写
                    key = "dead_key2"
            )
    })
    public void dl(Message message, Channel channel) throws IOException {
        log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
        //打印完直接丢弃消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}

测试结果

这里我们开启手动ack,然后在普通队列中拒绝ack并重新返回队列,当消息在队列时间超过3s,就会进入延迟队列

四、案例(对queue每一条消息消息进行设置)

创建生产者和消费者

package com.yolo.springbootrabbitmqproducer.controller;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

@Slf4j
@RestController
public class DL3Controller {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/dl3")
    public void dl() {
        String s = "Hello World! 3333333333";
        log.info(" [ 生产者 ] Sent ==> '" + s + "'");
        //设置过期时间
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("12000");
        Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend("normal_exchange3", "normal_key3", message);
    }

    // 监听 normal_queue 正常队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "normal_queue3",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
                    ,arguments = {
                    @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange3"), //指定一下死信交换机
                    @Argument(name = "x-dead-letter-routing-key",value = "dead_key3"),  //指定死信交换机的路由key
                    //@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
                    //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
            }
            ),
            exchange = @Exchange(value = "normal_exchange3",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "normal_key3"
    )
    })
    public void normal(Message message, Channel channel) throws IOException, InterruptedException {
        /*
         * deliveryTag:该消息的index
         * multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息
         * requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
         */
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }

    // 监听死信队列
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
                    value = @Queue(value = "dlx_queue3"),
                    exchange = @Exchange(value = "dlx_exchange3"),//Exchang的默认类型就是direct,所以type可以不写
                    key = "dead_key3"
            )
    })
    public void dl(Message message, Channel channel) throws IOException {
        log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
        //打印完直接丢弃消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}

重启测试

相关文章