如何将rabbitlisteners绑定到cloudamqp?

axkjgtzd  于 2021-08-25  发布在  Java
关注(0)|答案(0)|浏览(273)

我目前在两个应用程序(web/worker)之间实现rabbitmq消息传递时遇到问题。我的rabbitmq服务托管在cloudamqp(heroku插件)上。然而,任何 @RabbitListener 我声明似乎试图连接到 localhost 而不是云服务。
将以下组件添加到我的worker应用程序时:

  1. @Service
  2. public class TaskConsumer {
  3. @RabbitListener(queues = "worker.rpc.requests", containerFactory = "rabbitListenerContainerFactory")
  4. public String fetch(String p) {
  5. return p;
  6. }
  7. }

我遇到以下错误:

  1. 2021-07-05 14:38:23.006 INFO 18840 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
  2. 2021-07-05 14:38:32.145 WARN 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
  3. 2021-07-05 14:38:32.145 INFO 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32c8d668: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

如何绑定rabbitlistener,使其连接到amqp环境?以下是我的配置:

  1. @Configuration
  2. @EnableRabbit
  3. public class RabbitConfig {
  4. protected final String workerQueueName = "worker.rpc.requests";
  5. protected final String routingKeyName = "rpc";
  6. protected final String directExcName = "worker.exchange";
  7. @Bean
  8. public ConnectionFactory connectionFactory() {
  9. final URI ampqUrl;
  10. try {
  11. ampqUrl = new URI(getEnvOrThrow("CLOUDAMQP_URL"));
  12. } catch (URISyntaxException e) {
  13. throw new RuntimeException(e);
  14. }
  15. final CachingConnectionFactory factory = new CachingConnectionFactory();
  16. factory.setUsername(ampqUrl.getUserInfo().split(":")[0]);
  17. factory.setPassword(ampqUrl.getUserInfo().split(":")[1]);
  18. factory.setHost(ampqUrl.getHost());
  19. factory.setPort(ampqUrl.getPort());
  20. factory.setVirtualHost(ampqUrl.getPath().substring(1));
  21. try {
  22. factory.getRabbitConnectionFactory().setUri(ampqUrl);
  23. } catch (URISyntaxException e) {
  24. e.printStackTrace();
  25. } catch (NoSuchAlgorithmException e) {
  26. e.printStackTrace();
  27. } catch (KeyManagementException e) {
  28. e.printStackTrace();
  29. }
  30. return factory;
  31. }
  32. @Bean
  33. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
  34. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  35. factory.setConnectionFactory(connectionFactory());
  36. SimpleMessageListenerContainer container = factory
  37. .createListenerContainer();
  38. factory.setConcurrentConsumers(50);
  39. factory.setMaxConcurrentConsumers(100);
  40. container.setStartConsumerMinInterval(3000);
  41. container.setQueues(queue());
  42. factory.setMaxConcurrentConsumers(5);
  43. return factory;
  44. }
  45. @Bean
  46. public RabbitTemplate rabbitTemplate() {
  47. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  48. template.setRoutingKey(this.workerQueueName);
  49. template.setDefaultReceiveQueue(this.workerQueueName);
  50. return template;
  51. }
  52. @Bean
  53. public Queue queue() {
  54. return new Queue(this.workerQueueName);
  55. }
  56. @Bean
  57. public DirectExchange direct() {
  58. return new DirectExchange(this.directExcName);
  59. }
  60. @Bean
  61. public Binding binding(DirectExchange direct,
  62. Queue autoDeleteQueue1) {
  63. return BindingBuilder.bind(autoDeleteQueue1)
  64. .to(direct)
  65. .with(this.routingKeyName);
  66. }
  67. /**
  68. * Required for executing adminstration functions against an AMQP Broker
  69. */
  70. @Bean
  71. public AmqpAdmin amqpAdmin() {
  72. return new RabbitAdmin(connectionFactory());
  73. }
  74. private static String getEnvOrThrow(String name) {
  75. final String env = getenv(name);
  76. if (env == null) {
  77. throw new IllegalStateException("Environment variable [" + name + "] is not set.");
  78. }
  79. return env;
  80. }
  81. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题