springboot:整合rabbitmq之消息确认机制ack

x33g5p2x  于2022-06-16 转载在 Spring  
字(9.3k)|赞(0)|评价(0)|浏览(555)

springboot:整合rabbitmq之消息确认机制ack

一、依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

二、配置

package com.yolo.springbootrabbitmqproducer.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        String rabbitmqHost = "127.0.0.1";
        String rabbitmqPort = "5672";
        String rabbitmqUsername = "guest";
        String rabbitmqPassword = "guest";
        String rabbitmqVirtualHost = "/";
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setVirtualHost(rabbitmqVirtualHost);
//        connectionFactory.setPublisherReturns(true);//开启return模式
//        connectionFactory.setPublisherConfirms(true);//开启confirm模式
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    //必须是prototype类型
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置线程数
        factory.setConcurrentConsumers(1);
        //最大线程数
        factory.setMaxConcurrentConsumers(1);
        //设置为手动确认MANUAL(手动),AUTO(自动);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置prefetch
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

三、消息确认机制(ack)

RabbitMQ默认的消息确认机制是:自动确认的

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。

发送和监听消息

@RestController
public class AckSenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
        }
    }

    private int count1=1;
    private int count2=1;
    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "ack"
    )
    })
    public void receive(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "ack"
    )
    })
    public void receive2(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
    }
}

测试结果

http://localhost:8080/ack

消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者

停止程序,发现2条未确认的消息会回到Ready里面等待重新消费

再次重启,再次消费2条消息,但仍未确认

访问/ack,再次发布消息,消息堆积

四、设置手动ack

修改消费者手动确认

@RestController
public class AckSenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int count1=1;
    private int count2=1;
    private int count3 = 1;

    @GetMapping("/ack")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
        }
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "ack"
    )
    })
    public void receive(Message message, Channel channel) throws InterruptedException, IOException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
        // 确认消息
        // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
        // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "ack"
    )
    })
    public void receive2(Message message,Channel channel,@Headers Map<String, Object> map) throws InterruptedException, IOException {
        Thread.sleep(600);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);

        // 确认消息
        channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "ack"
    )
    })
    public void receive3(Message message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws InterruptedException, IOException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);

        // 确认消息
        channel.basicAck(deliveryTag, false);
    }
}

手动确认测试结果

手动确认通过调用方法实现
basicAck(long deliveryTag, boolean multiple)
deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)
multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签

这里发现程序刚刚启动就全部消费完了

继续发布,还是消费完成

修改消费者手动拒绝

@RestController
public class AckSenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int count1=1;
    private int count2=1;
    private int count3 = 1;

    @GetMapping("/ack")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
        }
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
            value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
            ),
            exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
            key = "ack"
    )
    })
    public void receive4(
            Message message, Channel channel) throws IOException, InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++);

        // 拒绝消息方式一
        // 第一个参数,交付标签
        // 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
        // 第三个参数,false表示直接丢弃消息,true表示重新排队
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

        // 拒绝消息方式二
        // 第一个参数,交付标签
        // 第二个参数,false表示直接丢弃消息,true表示重新排队
        // 跟basicNack的区别就是始终只拒绝提供的交付标签
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}

手动拒绝测试结果

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停
停止程序后,队列仍然是10条消息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
改成false,拒绝后直接丢弃
重启后:

五、总结

  • 未确认:什么也不用写,消息不会移除,重复消费,积攒越来越多
  • 确认:channel.basicAck();确认后,消息从队列中移除
  • 拒绝:channel.basicNack()或channel.basicReject();拒绝后,消息先从队列中移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)

相关文章