使用Node.js消费RabbitMQ消息时,我可以等待进程完成吗?

doinxwow  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(3)|浏览(195)

我是Node.js和ES6的新手,这让我有点困惑。我试图让一个进程运行,从RabbitMQ队列中消耗消息。它需要能够在抓取下一条消息之前处理消息(大约需要30-60秒)。目前,我的代码,它抓住所有的消息,然后尝试分叉进程。当队列中有3-5条消息时,这很好,但对于20、50或100条消息,这会导致服务器耗尽内存。
我试过创建.consume()回调函数functional.exe,并将await添加到消息处理函数中。我尝试在processMessage周围的.consume()回调中 Package await new Promise。我尝试将await添加到调用channel.consume的行中。没有什么能改变行为。

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;

作为旁注,如果我创建一个字符串数组并循环遍历字符串,当我向processMessage行添加await时,它会在处理下一个字符串之前等待执行process(30-60秒)。

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();

因此,我基本上需要这样的功能,但在RabbitMQ中监听队列。

ni65a41a

ni65a41a1#

如果你想限制消费者在任何给定时间处理的消息数量,请使用channel.prefetch():
给定的计数是通过通道发送的等待确认的最大消息数;一旦存在未完成的计数消息,服务器将不会在该信道上发送更多消息,直到一个或多个已被确认。
也就是说,如果您希望在处理下一条消息之前一次只处理一条消息,则设置channel.prefetch(1)

zf2sa74q

zf2sa74q2#

移动到下一个,设置通道.prefetch(1)

channel.prefetch(1)
wvmv3b1j

wvmv3b1j3#

如果有人需要一个完整的答案:
你需要混合使用channel.prefetch(1)和{ noAck:false }来消费一对一的消息:
简单示例:

const connection = await amqp.connect('amqp://localhost')    
const channel = await connection.createChannel()

await channel.assertQueue(queue, { durable: false })

// Set the number of messages to consume:
channel.prefetch(1)

await channel.consume(
      'QUEUE_NAME',
      async message => {
         if (message) {
            // YOUR ASYNC/AWAIT CODE

            // And then, ack the message manually:
            channel.ack(message)
         }
      },

      { noAck: false } // Set noAck to false to manually acknowledge messages
)

这是一次使用一条消息的方式。

相关问题