java RabbitMQ DefaultConsumer导致过多的消费者标记

kzipqqlq  于 2023-10-14  发布在  Java
关注(0)|答案(3)|浏览(133)

我有一个RabbitMQ客户端应用程序,它监听特定的队列。客户端创建一个DefaultConsumer的示例,并实现DefaultDelivery方法。这里是代码

protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }

方法receiveMessages()每隔500 ms通过线程频繁调用一次,并将消息排入不同的List中进行消费。由于receiveMessages()上的民意调查,我观察到消费者标签在通过rabbit控制台查看时不断被创建和增长,如图所示。看到这些越来越多的消费者标签是正常的吗?

k7fdbhmy

k7fdbhmy1#

我终于找到了一个有效的解决方案。正如卢克·巴肯强调的那样,不需要投票。我现在只调用receiveMesssages()一次。此后,当消息发布到队列中时,我的消费者将接收回调。

protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
 public void receiveMessages() {
    try {
        Message message = new Message();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            String response = new String(delivery.getBody(), "UTF-8");
            if (response != null) {
                message.setId(NUID.nextGlobal());
                message.setPayload(response);
                message.setDeliveryTag(deliveryTag);
                messages.add(message);
                logger.info("Message received: ", message.getPayload());
            };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    } catch (Exception e) {
        logger.error("Exception while getting messages from Rabbit ", e);
    }
}

rabbit控制台现在只显示绑定队列下的1个消费标签条目。

siotufzp

siotufzp2#

看到这些越来越多的消费者标签是正常的吗?
你的代码有一个错误。你需要使用一个长时间运行的消费者,或者在使用完消费者后取消它。
我看不出有任何“poll”receiveMessages的必要--只要让它自己运行,它就会像您期望的那样将消息添加到同步队列中。

km0tfn4u

km0tfn4u3#

public NotificationConsumerService(ConnectionFactory connectionFactory, String host, Logger logger) {
    this.connectionFactory = connectionFactory;
    this.host = host;
    this.logger = logger;
}

public void consumeSliceChangeNotification() {
    connectionFactory.setHost(this.host);
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            JSONObject obj = new JSONObject(message);
            String namespace = obj.getString("namespace");
            logger.info("The latest change notification on the " + namespace +" is available");
        };

        channel.basicConsume(QUEUE_NAME, true,deliverCallback, consumerTag -> { } );
    } 
    catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }

}

相关问题