如何将rabbitlisteners绑定到cloudamqp?

axkjgtzd  于 2021-08-25  发布在  Java
关注(0)|答案(0)|浏览(244)

我目前在两个应用程序(web/worker)之间实现rabbitmq消息传递时遇到问题。我的rabbitmq服务托管在cloudamqp(heroku插件)上。然而,任何 @RabbitListener 我声明似乎试图连接到 localhost 而不是云服务。
将以下组件添加到我的worker应用程序时:

@Service
public class TaskConsumer {
    @RabbitListener(queues = "worker.rpc.requests", containerFactory = "rabbitListenerContainerFactory")
    public String fetch(String p) {
        return p;
    }
}

我遇到以下错误:

2021-07-05 14:38:23.006  INFO 18840 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-07-05 14:38:32.145  WARN 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2021-07-05 14:38:32.145  INFO 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32c8d668: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

如何绑定rabbitlistener,使其连接到amqp环境?以下是我的配置:

@Configuration
@EnableRabbit
public class RabbitConfig {

    protected final String workerQueueName = "worker.rpc.requests";
    protected final String routingKeyName = "rpc";
    protected final String directExcName = "worker.exchange";

    @Bean
    public ConnectionFactory connectionFactory() {
        final URI ampqUrl;
        try {
            ampqUrl = new URI(getEnvOrThrow("CLOUDAMQP_URL"));
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }

        final CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(ampqUrl.getUserInfo().split(":")[0]);
        factory.setPassword(ampqUrl.getUserInfo().split(":")[1]);
        factory.setHost(ampqUrl.getHost());
        factory.setPort(ampqUrl.getPort());
        factory.setVirtualHost(ampqUrl.getPath().substring(1));

        try {
            factory.getRabbitConnectionFactory().setUri(ampqUrl);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (KeyManagementException e) {
            e.printStackTrace();
        }

        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());

        SimpleMessageListenerContainer container = factory
                .createListenerContainer();
        factory.setConcurrentConsumers(50);
        factory.setMaxConcurrentConsumers(100);
        container.setStartConsumerMinInterval(3000);
        container.setQueues(queue());
        factory.setMaxConcurrentConsumers(5);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(this.workerQueueName);
        template.setDefaultReceiveQueue(this.workerQueueName);
        return template;
    }

    @Bean
    public Queue queue() {
        return new Queue(this.workerQueueName);
    }

    @Bean
    public DirectExchange direct() {
        return new DirectExchange(this.directExcName);
    }

    @Bean
    public Binding binding(DirectExchange direct,
                             Queue autoDeleteQueue1) {
        return BindingBuilder.bind(autoDeleteQueue1)
                .to(direct)
                .with(this.routingKeyName);
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    private static String getEnvOrThrow(String name) {
        final String env = getenv(name);
        if (env == null) {
            throw new IllegalStateException("Environment variable [" + name + "] is not set."); 
        }
        return env;
    }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题