我尝试用Bull框架将DTO发送到我的Redis队列中,并在处理器中处理这些DTO。有时作业会传递到处理器(100个中的1个),但大多数情况下会失败,错误为:job stalled more than allowable limit
,我不知道如何修复它。
我创建了queue-api
模块,作为队列的 Package 器,例如订单队列。然后,我将此模块导入到要将DTO发布到队列中的模块中,在我的示例中为order-module
。
queue-api模块文件
// queue-api.module.ts
@Module({
imports: [
BullModule.registerQueue(
{
name: 'order-queue',
defaultJobOptions: {
backoff: 10000,
attempts: Number.MAX_SAFE_INTEGER,
},
},
),
...
],
providers: [OrderQueue],
exports: [OrderQueue],
})
export class QueueApiModule {}
// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
constructor(
@InjectQueue('order-queue')
private readonly queue: Queue,
) {}
async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
const job = await this.queue.add('send-submit-mail', dto)
console.log(`Job ${job.id} created.`)
}
}
订单模块文件
// order.module.ts
@Module({
imports: [
QueueApiModule,
...
],
providers: [
OrderProcessor,
...
]
})
export class OrderModule {}
// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
constructor(private readonly queue: OrderQueue) {}
@Process('send-submit-mail')
async onProcessSubmitMail(job: Job): Promise<void> {
console.log(`Processing of job ${job.id}`)
}
}
几乎从不调用该处理器处理程序。
你知道我的代码有什么问题吗?谢谢你的建议。
2条答案
按热度按时间jgwigjjp1#
我遇到了类似的问题,但还没有深入研究找到根本原因。但同时我使用了bull-repl(npm bull-repl)cli查看队列状态。当发生停止错误时,之后不会触发任何作业(队列似乎在失败的作业上卡住了)。如果您在bull-repl中运行统计,您将看到有一个作业处于活动状态。您可以手动删除它(使用bull-repl),然后您将运行下一个作业。我怀疑QueueScheduler没有运行,因此未处理暂停的作业。您也可以增加暂停超时参数(有2-3个,请查看[https://docs.bullmq.io/bull/important-notes]),看看是否有帮助。在我的情况下,当我在调试中暂停时,会发生锁定。
a0x5cqrl2#
晚了点,还是写在这里比较好
这是因为这条线
constructor(private readonly queue: OrderQueue) {}
更准确地说,这是因为DI机制,可能是服务是
Scope.REQUEST
(或其注入服务之一,这使得主机服务也是一个Scope.REQUEST服务,整个注入子树都是请求作用域)的原因@Process()
在单独的进程中运行处理程序,因此无法访问Injector。如果您查看尝试处理www.example.com所导致的错误job.data,您将看到类似以下的内容(在我的示例中,尝试注入EmailService):
stacktrace ["TypeError: this.request.get is not a function\n at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)\n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)\n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)\n at processTicksAndRejections (node:internal/process/task_queues:95:5)\n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)\n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)\n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)\n at async Promise.all (index 0)\n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)\n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)","TypeError: this.request.get is not a function\n at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)\n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)\n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)\n at processTicksAndRejections (node:internal/process/task_queues:95:5)\n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)\n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)\n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)\n at async Promise.all (index 0)\n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)\n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)"]