java RabbitMQ没有选择正确的消费者

nvbavucw  于 2023-10-14  发布在  Java
关注(0)|答案(2)|浏览(105)

我从http://www.rabbitmq.com/tutorials/tutorial-six-java.html中提取了这个例子,从RPCClient中添加了一个RPC调用,并在标准输出中添加了一些日志记录。因此,当执行第二次调用时,rabbitmq使用了错误的相关ID的消费者,这不是预期的行为。是窃听器还是我搞错了?
RPC服务器:

package com.foo.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

  private static final String RPC_QUEUE_NAME = "sap-consume";

  private static int fib(int n) {
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  }

  public static void main(String[] argv) {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);

    Connection connection = null;
    try {
      connection      = factory.newConnection();
      final Channel channel = connection.createChannel();

      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      System.out.println(" [x] Awaiting RPC requests");

      Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          AMQP.BasicProperties replyProps = new AMQP.BasicProperties
            .Builder()
            .correlationId(properties.getCorrelationId())
            .build();

          String response = "";

          try {
            String message = new String(body,"UTF-8");
            int n = Integer.parseInt(message);

            System.out.println(" [.] fib(" + message + ")");
            response += fib(n);
          }
          catch (RuntimeException e){
            System.out.println(" [.] " + e.toString());
          }
          finally {
            channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
            channel.basicAck(envelope.getDeliveryTag(), false);
        // RabbitMq consumer worker thread notifies the RPC server owner thread
            synchronized(this) {
              this.notify();
            }
          }
        }
      };

      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
      // Wait and be prepared to consume the message from RPC client.
      while (true) {
        synchronized(consumer) {
          try {
            consumer.wait();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    } catch (IOException | TimeoutException e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null)
        try {
          connection.close();
        } catch (IOException _ignore) {}
    }
  }
}

RPCCLient:

package com.bar.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

  private Connection connection;
  private Channel channel;
  private String requestQueueName = "sap-consume";
  private String replyQueueName;

  public RPCClient() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);

    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();
  }

  public String call(String message) throws IOException, InterruptedException {
    final String corrId = UUID.randomUUID().toString();

    AMQP.BasicProperties props = new AMQP.BasicProperties
      .Builder()
      .correlationId(corrId)
      .replyTo(replyQueueName)
      .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

    final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (properties.getCorrelationId().equals(corrId)) {
          System.out.println("Correlation Id" + properties.getCorrelationId() + " corresponds to expected one.");
          response.offer(new String(body, "UTF-8"));
        } else {
          System.out.println("Correlation Id" + properties.getCorrelationId() + " doesn't correspond to expected one " + corrId);
        }
      }
    });

    return response.take();
  }

  public void close() throws IOException {
    connection.close();
  }

  public static void main(String[] argv) {
    RPCClient rpc = null;
    String response = null;
    try {
      rpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");
      response = rpc.call("30");
      System.out.println(" [.] Got '" + response + "'");
      System.out.println(" [x] Requesting fib(40)");
      response = rpc.call("40");
      System.out.println(" [.] Got '" + response + "'");
    } catch (IOException | TimeoutException | InterruptedException e) {
      e.printStackTrace();
    } finally {
      if (rpc != null) {
        try {
          rpc.close();
        } catch (IOException _ignore) {
        }
      }
    }
  }
}
rmbxnbpk

rmbxnbpk1#

是的,你在教程代码中发现了一个bug。我已经在这里打开了一个pull request来修复它,你也可以找到对发生的事情的解释:
https://github.com/rabbitmq/rabbitmq-tutorials/pull/174

k7fdbhmy

k7fdbhmy2#

这个例子很简单:它使用一个队列用于应答。通过发送第二个请求,您向应答注册了一个新的消费者,但第一个请求的消费者仍然在侦听,实际上 * 窃取了第二个请求的响应 *。这就是为什么客户端似乎使用相同的相关ID。
我们updated the client code为每个请求使用独占的自动删除队列。这个队列将被服务器自动删除,因为它的唯一消费者在收到响应后被取消订阅。这有点复杂,但更接近现实世界的场景。
注意,使用RabbitMQ处理回复队列的最佳方法是使用direct reply-to。这使用比真实的队列更轻的伪队列。我们在教程中没有提到直接回复,以使其尽可能简单,但这是生产中使用的首选功能。

相关问题