通过AMQP 1.0协议连接到RabbitMQ持久队列失败-Protonj 2

1cosmwyk  于 2024-01-09  发布在  RabbitMQ
关注(0)|答案(1)|浏览(212)

找到解决方案:-请参阅本问题结尾处的解决方案。
我试图连接我的AMQP 1.0消费者(使用Apache ProtonJ 2库)。但我的连接失败,错误如下

  1. org.apache.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue_durable' in vhost '/': received 'false' but current is 'true' [condition = amqp:precondition-failed]

字符串
下面是我的示例代码。

  1. public void connectAmqp() throws Throwable {
  2. final String serverHost = "localhost";
  3. final int serverPort = 5672;
  4. final String address = "test_queue_durable";
  5. final Client client = Client.create();
  6. final ConnectionOptions options = new ConnectionOptions().user("admin").password("admin");
  7. try{
  8. Connection connection = client.connect(serverHost, serverPort, options);
  9. Receiver receiver = connection.openReceiver(address);
  10. for (int i = 0; i < 100; ++i) {
  11. Delivery delivery = receiver.receive();
  12. System.out.println(delivery.message().body().getClass());
  13. System.out.println("*-*-*-* " + new String((byte[])delivery.message().body()));
  14. }
  15. }catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }


注意事项:
1.在RabbitMQ中预先声明队列
1.队列被配置为持久的,以防止消息丢失
1.如果删除队列的持久属性,则使用者连接成功(但这不是预期的)
1.连接需要AMQP 1.0协议
1.用于连接的客户端库是Apache Qpid Proton J2。

已编辑的决议

  1. public void connectAmqp() throws Throwable {
  2. final String serverHost = "localhost";
  3. final int serverPort = 5672;
  4. final String address = "test_queue_durable";
  5. final Client client = Client.create();
  6. try{
  7. ConnectionOptions options = new ConnectionOptions().user("user").password("pa$$w0rd");
  8. Connection connection = client.connect(serverHost, serverPort, options);
  9. /**
  10. * Consumer Connecting to Durable Queue Connections
  11. * Set durable property on source link of receiveroptions
  12. */
  13. ReceiverOptions ro = new ReceiverOptions();
  14. ro.sourceOptions().durabilityMode(DurabilityMode.CONFIGURATION);
  15. Receiver receiver = connection.openReceiver(address, ro);
  16. for (int i = 0; i < 100; ++i) {
  17. Delivery delivery = receiver.receive();
  18. System.out.println(delivery.message().body().getClass());
  19. System.out.println("*-*-*-* " + new String((byte[])delivery.message().body()));
  20. }
  21. }catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }

4szc88ey

4szc88ey1#

您可能需要配置Receiver Source值以匹配您在代理中创建的队列,以便它允许接收器附加。
你需要做这样的事情(配置满足RabbitMQ附加先决条件):

  1. ReceiverOptions receiverOptions = new ReceiverOptions();
  2. receiverOptions.sourceOptions().durabilityMode(DurabilityMode.CONFIGURATION);
  3. Receiver receiver = session.openReceiver(address, receiverOptions);

字符串

相关问题