springboot:整合rabbitmq之消息回调

x33g5p2x  于2022-06-16 转载在 Spring  
字(5.3k)|赞(0)|评价(0)|浏览(503)

springboot:整合rabbitmq之消息回调

一、项目准备

配置类

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. @Bean
  4. public ConnectionFactory rabbitConnectionFactory() {
  5. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  6. String rabbitmqHost = "127.0.0.1";
  7. String rabbitmqPort = "5672";
  8. String rabbitmqUsername = "guest";
  9. String rabbitmqPassword = "guest";
  10. String rabbitmqVirtualHost = "/";
  11. connectionFactory.setHost(rabbitmqHost);
  12. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  13. connectionFactory.setUsername(rabbitmqUsername);
  14. connectionFactory.setPassword(rabbitmqPassword);
  15. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  16. connectionFactory.setPublisherReturns(true);//开启return模式
  17. connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
  18. return connectionFactory;
  19. }
  20. @Bean(name = "rabbitTemplate")
  21. //必须是prototype类型
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public RabbitTemplate rabbitTemplate() {
  24. return new RabbitTemplate(rabbitConnectionFactory());
  25. }
  26. @Bean("customContainerFactory")
  27. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  28. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  29. //设置线程数
  30. factory.setConcurrentConsumers(1);
  31. //最大线程数
  32. factory.setMaxConcurrentConsumers(1);
  33. // 设置为手动确认MANUAL(手动),AUTO(自动);
  34. // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  35. // 设置prefetch
  36. factory.setPrefetchCount(1);
  37. configurer.configure(factory, connectionFactory);
  38. return factory;
  39. }
  40. }

或者 yaml配置文件 二选一

  1. spring:
  2. rabbitmq:
  3. port: 5672
  4. host: 127.0.0.1
  5. username: guest
  6. password: guest
  7. publisher-confirm-type: correlated
  8. publisher-returns: true
  9. virtual-host: /

二、消息回调

  • ConfirmCallback:当消息到达交换机触发回调
  • ReturnsCallback:消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,匹配不到触发回调

重写ConfirmCallback

  1. @Slf4j
  2. @Component
  3. public class RabbitConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
  4. /**
  5. * 消息到达交换机触发回调
  6. */
  7. @Override
  8. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  9. if (!ack) {
  10. log.error("消息发送异常! correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
  11. }else {
  12. log.info("消息发送成功");
  13. }
  14. }
  15. }

重写ReturnsCallback

  1. @Slf4j
  2. @Component
  3. public class RabbitReturnCallbackService implements RabbitTemplate.ReturnCallback{
  4. /**
  5. * 消息路由失败,回调
  6. * 消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,匹配不到触发回调
  7. */
  8. @Override
  9. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  10. log.error("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
  11. }
  12. }

消息发送者和消费者

注意:
若使用 confirm-callback 或 return-callback,需要配置

publisher-confirm-type: correlated

publisher-returns: true

使用return-callback时必须设置mandatory为true

或者在配置中设置rabbitmq.template.mandatory=true

  1. @RestController
  2. public class CallbackController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Autowired
  6. private RabbitConfirmCallbackService rabbitConfirmCallbackService;
  7. @Autowired
  8. private RabbitReturnCallbackService rabbitReturnCallbackService;
  9. @GetMapping("/callback")
  10. public void callback() {
  11. // 全局唯一
  12. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  13. String message = "Hello world!";
  14. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  15. rabbitTemplate.setMandatory(true);
  16. rabbitTemplate.setConfirmCallback(rabbitConfirmCallbackService);
  17. rabbitTemplate.setReturnCallback(rabbitReturnCallbackService);
  18. rabbitTemplate.convertAndSend("callback.exchange", "callback.a.yzm", message, correlationData);
  19. }
  20. @GetMapping("/callback2")
  21. public void callback2() {
  22. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  23. String message = "Hello world!";
  24. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  25. rabbitTemplate.setMandatory(true);
  26. rabbitTemplate.setConfirmCallback(rabbitConfirmCallbackService);
  27. rabbitTemplate.setReturnCallback(rabbitReturnCallbackService);
  28. rabbitTemplate.convertAndSend("不存在的交换机", "callback.a.yzm", message, correlationData);
  29. }
  30. @GetMapping("/callback3")
  31. public void callback3() {
  32. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  33. String message = "Hello world!";
  34. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  35. rabbitTemplate.setMandatory(true);
  36. rabbitTemplate.setConfirmCallback(rabbitConfirmCallbackService);
  37. rabbitTemplate.setReturnCallback(rabbitReturnCallbackService);
  38. rabbitTemplate.convertAndSend("callback.exchange", "不存在的路由键", message, correlationData);
  39. }
  40. @RabbitListener(containerFactory = "customContainerFactory",bindings = @QueueBinding(
  41. value = @Queue(value = "callback_queue"),
  42. exchange = @Exchange(value = "callback.exchange"),
  43. key = {"callback.a.yzm", "callback.b.admin"}
  44. ))
  45. public void callbackA(Message message) {
  46. System.out.println(" [ 消费者@A号 ] Received ==> '" + new String(message.getBody()) + "'");
  47. }
  48. }

测试

访问http://localhost:8080/callback

消息正确到达交换机触发回调

访问http://localhost:8080/callback2

消息找不到交换机触发回调

访问http://localhost:8080/callback3

消息路由失败触发回调

三、注意

  • 若使用 confirm-callback 或 return-callback,需要配置
  1. publisher-confirm-type: correlated
  2. publisher-returns: true
  • 使用return-callback时必须设置mandatory为true或者在配置中设置rabbitmq.template.mandatory=true

相关文章