channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable
final long msgTag = envelope.getDeliveryTag();
Runnable runnable = new Runnable() {
@Override
public void run() {
// handle the message here
doStuff(msgBody);
channel.basicAck(msgTag, false);
}
};
threadPool.submit(runnable);
}
});
1条答案
按热度按时间kb5ga3dv1#
您应该查看一下Channel.basicConsume和DefaultConsumer抽象类:https://www.rabbitmq.com/api-guide.html#consuming
Java并发将需要回调的线程来处理每个消息,但是您可以使用线程池来重用线程。
现在您需要创建一个消费者,它将通过创建一个Runnable示例来处理每次交付,该示例将被传递到线程池以执行。
这展示了如何在单个连接和通道上处理并发交付,而无需在单个线程中使用while循环,因为while循环会在每次交付时阻塞。为了您的理智,您可能希望将
Runnable
实现分解到它自己的类中,该类可以接受channel
、msgBody
msgTag
和任何其他数据作为参数,当调用run()
方法时,这些参数将是可访问的。