我正在用nestjs创建微服务,transfer throw rabbitmq。如何让微服务依次从队列中接收消息,等待前一个消息完成。
- main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'rmq_queue',
queueOptions: { durable: false },
prefetchCount: 1,
},
});
await app.listenAsync();
}
bootstrap();
- app.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('hello')
async handleHello(): Promise<void> {
Logger.log('-handle-');
await (new Promise(resolve => setTimeout(resolve, 5000)));
Logger.log('---hello---');
}
}
- client.js
const { ClientRMQ } = require('@nestjs/microservices');
(async () => {
const client = new ClientRMQ({
urls: ['amqp://localhost:5672'],
queue: 'rmq_queue',
queueOptions: { durable: false },
});
await client.connect();
for (let i = 0; i < 3; i++) {
client.emit('hello', 0).subscribe();
}
})();
https://github.com/heySasha/nest-rmq
实际产量:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +9ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +12ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +4967ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +1ms
但我期望:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
3条答案
按热度按时间von4xj4u1#
你想要的通常是通过消费者确认来完成的。你可以阅读关于它们的here。简而言之,你的消费者(在你的例子中是Nest.js微服务),它的预取计数设置为1,只有在它确认了前一个消息之后才会收到新的消息。如果你熟悉AWS SQS,这个操作类似于从队列中删除消息。
Nest.js使用amqplib与RabbitMQ进行通信。消费者确认策略是在创建通道时建立的--您可以看到有一个
noAck
选项。但是,创建通道时将noAck
设置为true
--您可以在此处检查它。这意味着当消息被传递给@EventHandler
方法时,侦听器会自动确认消息。它提供了方便的UI和在传输中检查未确认消息的能力。我在Nestiderjs源代码和文档中都没有找到任何有用的信息。但这可能会给你一个提示。
ulydmbyx2#
我写过自定义策略。
与标准
ServerRMQ
不同的核心是setupChannel()
部分,我们现在使用this.channel.ack(msg)
在this.handleMessage(msg)
的 finally 部分中手动传递noAck: false
和确认。gz5pxeao3#
您应该将
noAck: false
添加到main.ts中。此外,您还必须将
context
添加到控制器,并进行ack
确认。