java如何处理rabbitmq消息的回调

muk1a3rh  于 2023-01-07  发布在  Java
关注(0)|答案(1)|浏览(124)

我想创建一个服务器来处理来自用户的套接字连接,在我的服务器中,我想有一个到RabbitMQ的连接,每个连接一个,但在他们的网页中提供的示例中,我只看到“while”循环来等待消息,在这种情况下,我将需要为每个连接创建一个线程,仅用于处理来自RabbitMQ的消息。
有没有办法在Java中使用Spring或任何框架来完成这一点,我只是为RabbitMQ创建回调,而不是使用while循环?
我使用的是node.js,在那里可以非常简单地完成此操作,我想了解一些针对Java的建议

kb5ga3dv

kb5ga3dv1#

您应该查看一下Channel.basicConsume和DefaultConsumer抽象类:https://www.rabbitmq.com/api-guide.html#consuming
Java并发将需要回调的线程来处理每个消息,但是您可以使用线程池来重用线程。

static final ExecutorService threadPool;

static {
    threadPool = Executors.newCachedThreadPool();
}

现在您需要创建一个消费者,它将通过创建一个Runnable示例来处理每次交付,该示例将被传递到线程池以执行。

channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable
        final long msgTag = envelope.getDeliveryTag();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // handle the message here
                doStuff(msgBody);
                channel.basicAck(msgTag, false);
            }
        };
        threadPool.submit(runnable);
    }
});

这展示了如何在单个连接和通道上处理并发交付,而无需在单个线程中使用while循环,因为while循环会在每次交付时阻塞。为了您的理智,您可能希望将Runnable实现分解到它自己的类中,该类可以接受channelmsgBodymsgTag和任何其他数据作为参数,当调用run()方法时,这些参数将是可访问的。

相关问题