rabbitmq 如何让等待完成的动作,再收到新的讯息?

bvuwiixz  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(3)|浏览(250)

我正在用nestjs创建微服务,transfer throw rabbitmq。如何让微服务依次从队列中接收消息,等待前一个消息完成。

  • main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: {
      urls: [`amqp://localhost:5672`],
      queue: 'rmq_queue',
      queueOptions: { durable: false },
      prefetchCount: 1,
    },
  });

  await app.listenAsync();
}

bootstrap();
  • app.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('hello')
  async handleHello(): Promise<void> {
    Logger.log('-handle-');
    await (new Promise(resolve => setTimeout(resolve, 5000)));
    Logger.log('---hello---');
  }
}
  • client.js
const { ClientRMQ } = require('@nestjs/microservices');

(async () => {
  const client = new ClientRMQ({
    urls: ['amqp://localhost:5672'],
    queue: 'rmq_queue',
    queueOptions: { durable: false },
  });

  await client.connect();

  for (let i = 0; i < 3; i++) {
    client.emit('hello', 0).subscribe();
  }
})();

https://github.com/heySasha/nest-rmq
实际产量:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +9ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +12ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +4967ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +1ms

但我期望:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
von4xj4u

von4xj4u1#

你想要的通常是通过消费者确认来完成的。你可以阅读关于它们的here。简而言之,你的消费者(在你的例子中是Nest.js微服务),它的预取计数设置为1,只有在它确认了前一个消息之后才会收到新的消息。如果你熟悉AWS SQS,这个操作类似于从队列中删除消息。
Nest.js使用amqplib与RabbitMQ进行通信。消费者确认策略是在创建通道时建立的--您可以看到有一个noAck选项。但是,创建通道时将noAck设置为true--您可以在此处检查它。这意味着当消息被传递给@EventHandler方法时,侦听器会自动确认消息。它提供了方便的UI和在传输中检查未确认消息的能力。
我在Nestiderjs源代码和文档中都没有找到任何有用的信息。但这可能会给你一个提示。

ulydmbyx

ulydmbyx2#

我写过自定义策略。

import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { CustomTransportStrategy, RmqOptions, Server } from '@nestjs/microservices';
import {
    CONNECT_EVENT, DISCONNECT_EVENT, DISCONNECTED_RMQ_MESSAGE, NO_MESSAGE_HANDLER,
    RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
    RQM_DEFAULT_PREFETCH_COUNT,
    RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS,
    RQM_DEFAULT_URL,
} from '@nestjs/microservices/constants';

let rqmPackage: any = {};

export class ServerRMQ extends Server implements CustomTransportStrategy {
    private server: any = null;
    private channel: any = null;
    private readonly urls: string[];
    private readonly queue: string;
    private readonly prefetchCount: number;
    private readonly queueOptions: any;
    private readonly isGlobalPrefetchCount: boolean;

    constructor(private readonly options: RmqOptions['options']) {
        super();
        this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
        this.queue =
            this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
        this.prefetchCount =
            this.getOptionsProp(this.options, 'prefetchCount') ||
            RQM_DEFAULT_PREFETCH_COUNT;
        this.isGlobalPrefetchCount =
            this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
            RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
        this.queueOptions =
            this.getOptionsProp(this.options, 'queueOptions') ||
            RQM_DEFAULT_QUEUE_OPTIONS;

        this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
        rqmPackage = this.loadPackage(
            'amqp-connection-manager',
            ServerRMQ.name,
            () => require('amqp-connection-manager'),
        );
    }

    public async listen(callback: () => void): Promise<void> {
        await this.start(callback);
    }

    public close(): void {
        if (this.channel) {
            this.channel.close();
        }

        if (this.server) {
            this.server.close();
        }
    }

    public async start(callback?: () => void) {
        this.server = this.createClient();
        this.server.on(CONNECT_EVENT, (_: any) => {
            this.channel = this.server.createChannel({
                json: false,
                setup: (channel: any) => this.setupChannel(channel, callback),
            });
        });
        this.server.on(DISCONNECT_EVENT, (err: any) => {
            this.logger.error(DISCONNECTED_RMQ_MESSAGE);
        });
    }

    public createClient<T = any>(): T {
        const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
        return rqmPackage.connect(this.urls, socketOptions);
    }

    public async setupChannel(channel: any, callback: () => void) {
        await channel.assertQueue(this.queue, this.queueOptions);
        await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
        channel.consume(
            this.queue,
            (msg: any) => this.handleMessage(msg)
                .then(() => this.channel.ack(msg)) // Ack message after complete
                .catch(err => {
                    // error handling
                    this.logger.error(err);
                    return this.channel.ack(msg);
                }),
            { noAck: false },
        );
        callback();
    }

    public async handleMessage(message: any): Promise<void> {
        const { content, properties } = message;
        const packet = JSON.parse(content.toString());
        const pattern = isString(packet.pattern)
            ? packet.pattern
            : JSON.stringify(packet.pattern);

        if (isUndefined(packet.id)) {
            return this.handleEvent(pattern, packet);
        }

        const handler = this.getHandlerByPattern(pattern);

        if (!handler) {
            const status = 'error';

            return this.sendMessage(
                { status, err: NO_MESSAGE_HANDLER },
                properties.replyTo,
                properties.correlationId,
            );
        }

        const response$ = this.transformToObservable(
            await handler(packet.data),
        ) as Observable<any>;

        const publish = <T>(data: T) =>
            this.sendMessage(data, properties.replyTo, properties.correlationId);

        if (response$) {
            this.send(response$, publish);
        }

    }

    public sendMessage<T = any>(
        message: T,
        replyTo: any,
        correlationId: string,
    ): void {
        const buffer = Buffer.from(JSON.stringify(message));
        this.channel.sendToQueue(replyTo, buffer, { correlationId });
    }
}

与标准ServerRMQ不同的核心是setupChannel()部分,我们现在使用this.channel.ack(msg)this.handleMessage(msg)finally 部分中手动传递noAck: false和确认。

gz5pxeao

gz5pxeao3#

您应该将noAck: false添加到main.ts中。

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: {
      urls: [`amqp://localhost:5672`],
      queue: 'rmq_queue',
      queueOptions: { durable: false },
      noAck: false,
      prefetchCount: 1,
    },
  });

  await app.listenAsync();
}

bootstrap();

此外,您还必须将context添加到控制器,并进行ack确认。

import { Controller, Logger } from '@nestjs/common';
import {  Ctx, EventPattern, RmqContext } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('hello')
  async handleHello(@Ctx() context: RmqContext): Promise<void> {
    Logger.log('-handle-');
    await (new Promise(resolve => setTimeout(resolve, 5000)));
    Logger.log('---hello---');

    const channel = context.getChannelRef();
    const originalMsg = context.getMessage();
    channel.ack(originalMsg);
  }
}

相关问题