我有一个RabbitMQ客户端应用程序,它监听特定的队列。客户端创建一个DefaultConsumer的示例,并实现DefaultDelivery方法。这里是代码
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
// channel.basicQos(pollCount);
Message message = new Message();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String response = new String(body, "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
}
}
};
logger.debug("**********Channel status: " + channel.isOpen());
channel.basicConsume(queueName, false, consumer);
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
方法receiveMessages()每隔500 ms通过线程频繁调用一次,并将消息排入不同的List中进行消费。由于receiveMessages()上的民意调查,我观察到消费者标签在通过rabbit控制台查看时不断被创建和增长,如图所示。看到这些越来越多的消费者标签是正常的吗?
3条答案
按热度按时间k7fdbhmy1#
我终于找到了一个有效的解决方案。正如卢克·巴肯强调的那样,不需要投票。我现在只调用
receiveMesssages()
一次。此后,当消息发布到队列中时,我的消费者将接收回调。rabbit控制台现在只显示绑定队列下的1个消费标签条目。
siotufzp2#
看到这些越来越多的消费者标签是正常的吗?
你的代码有一个错误。你需要使用一个长时间运行的消费者,或者在使用完消费者后取消它。
我看不出有任何“poll”
receiveMessages
的必要--只要让它自己运行,它就会像您期望的那样将消息添加到同步队列中。km0tfn4u3#