springboot:整合rabbitmq之消息回调

x33g5p2x  于2022-06-16 转载在 Spring  
字(5.3k)|赞(0)|评价(0)|浏览(426)

springboot:整合rabbitmq之消息回调

一、项目准备

配置类

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        String rabbitmqHost = "127.0.0.1";
        String rabbitmqPort = "5672";
        String rabbitmqUsername = "guest";
        String rabbitmqPassword = "guest";
        String rabbitmqVirtualHost = "/";
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        connectionFactory.setVirtualHost(rabbitmqVirtualHost);
        connectionFactory.setPublisherReturns(true);//开启return模式
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        return connectionFactory;
    }

    @Bean(name = "rabbitTemplate")
    //必须是prototype类型
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置线程数
        factory.setConcurrentConsumers(1);
        //最大线程数
        factory.setMaxConcurrentConsumers(1);

//        设置为手动确认MANUAL(手动),AUTO(自动);
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置prefetch
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

或者 yaml配置文件 二选一

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true
    virtual-host: /

二、消息回调

  • ConfirmCallback:当消息到达交换机触发回调
  • ReturnsCallback:消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,匹配不到触发回调

重写ConfirmCallback

@Slf4j
@Component
public class RabbitConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /**
     * 消息到达交换机触发回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("消息发送异常! correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }else {
            log.info("消息发送成功");
        }
    }
}

重写ReturnsCallback

@Slf4j
@Component
public class RabbitReturnCallbackService implements RabbitTemplate.ReturnCallback{

    /**
     * 消息路由失败,回调
     * 消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,匹配不到触发回调
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

消息发送者和消费者

注意:
若使用 confirm-callback 或 return-callback,需要配置

publisher-confirm-type: correlated

publisher-returns: true

使用return-callback时必须设置mandatory为true

或者在配置中设置rabbitmq.template.mandatory=true

@RestController
public class CallbackController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitConfirmCallbackService rabbitConfirmCallbackService;

    @Autowired
    private RabbitReturnCallbackService rabbitReturnCallbackService;

    @GetMapping("/callback")
    public void callback() {
        // 全局唯一
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "Hello world!";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");

        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(rabbitConfirmCallbackService);
        rabbitTemplate.setReturnCallback(rabbitReturnCallbackService);
        rabbitTemplate.convertAndSend("callback.exchange", "callback.a.yzm", message, correlationData);
    }

    @GetMapping("/callback2")
    public void callback2() {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "Hello world!";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(rabbitConfirmCallbackService);
        rabbitTemplate.setReturnCallback(rabbitReturnCallbackService);
        rabbitTemplate.convertAndSend("不存在的交换机", "callback.a.yzm", message, correlationData);
    }

    @GetMapping("/callback3")
    public void callback3() {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "Hello world!";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(rabbitConfirmCallbackService);
        rabbitTemplate.setReturnCallback(rabbitReturnCallbackService);
        rabbitTemplate.convertAndSend("callback.exchange", "不存在的路由键", message, correlationData);
    }

    @RabbitListener(containerFactory = "customContainerFactory",bindings = @QueueBinding(
            value = @Queue(value = "callback_queue"),
            exchange = @Exchange(value = "callback.exchange"),
            key = {"callback.a.yzm", "callback.b.admin"}
    ))
    public void callbackA(Message message) {
        System.out.println(" [ 消费者@A号 ] Received ==> '" + new String(message.getBody()) + "'");
    }

}

测试

访问http://localhost:8080/callback

消息正确到达交换机触发回调

访问http://localhost:8080/callback2

消息找不到交换机触发回调

访问http://localhost:8080/callback3

消息路由失败触发回调

三、注意

  • 若使用 confirm-callback 或 return-callback,需要配置
publisher-confirm-type: correlated

publisher-returns: true
  • 使用return-callback时必须设置mandatory为true或者在配置中设置rabbitmq.template.mandatory=true

相关文章