我目前在两个应用程序(web/worker)之间实现rabbitmq消息传递时遇到问题。我的rabbitmq服务托管在cloudamqp(heroku插件)上。然而,任何 @RabbitListener
我声明似乎试图连接到 localhost
而不是云服务。
将以下组件添加到我的worker应用程序时:
@Service
public class TaskConsumer {
@RabbitListener(queues = "worker.rpc.requests", containerFactory = "rabbitListenerContainerFactory")
public String fetch(String p) {
return p;
}
}
我遇到以下错误:
2021-07-05 14:38:23.006 INFO 18840 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-07-05 14:38:32.145 WARN 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2021-07-05 14:38:32.145 INFO 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32c8d668: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
如何绑定rabbitlistener,使其连接到amqp环境?以下是我的配置:
@Configuration
@EnableRabbit
public class RabbitConfig {
protected final String workerQueueName = "worker.rpc.requests";
protected final String routingKeyName = "rpc";
protected final String directExcName = "worker.exchange";
@Bean
public ConnectionFactory connectionFactory() {
final URI ampqUrl;
try {
ampqUrl = new URI(getEnvOrThrow("CLOUDAMQP_URL"));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(ampqUrl.getUserInfo().split(":")[0]);
factory.setPassword(ampqUrl.getUserInfo().split(":")[1]);
factory.setHost(ampqUrl.getHost());
factory.setPort(ampqUrl.getPort());
factory.setVirtualHost(ampqUrl.getPath().substring(1));
try {
factory.getRabbitConnectionFactory().setUri(ampqUrl);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
}
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
SimpleMessageListenerContainer container = factory
.createListenerContainer();
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
container.setStartConsumerMinInterval(3000);
container.setQueues(queue());
factory.setMaxConcurrentConsumers(5);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.workerQueueName);
template.setDefaultReceiveQueue(this.workerQueueName);
return template;
}
@Bean
public Queue queue() {
return new Queue(this.workerQueueName);
}
@Bean
public DirectExchange direct() {
return new DirectExchange(this.directExcName);
}
@Bean
public Binding binding(DirectExchange direct,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with(this.routingKeyName);
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
private static String getEnvOrThrow(String name) {
final String env = getenv(name);
if (env == null) {
throw new IllegalStateException("Environment variable [" + name + "] is not set.");
}
return env;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!