我需要开发一个worker来消费GCP Pub/Sub消息。
下面是我使用的代码:
main.ts
import { INestApplication, INestApplicationContext } from '@nestjs/common';
import { CancellationTokenService } from '@/services/cancellationToken';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '@/app.module';
import { PubSubService } from '@/services/pubSub';
async function main() {
const app = await NestFactory.createApplicationContext(AppModule);
const cancellationToken = app.get(CancellationTokenService);
configureApplicationStop(cancellationToken);
await run(app, cancellationToken);
while (!cancellationToken.isCancelled) {
console.log('Microservice is running...');
await new Promise((resolve) => setTimeout(resolve, 1000));
}
await app.close();
}
async function run(
app: INestApplicationContext,
cancellationToken: CancellationTokenService,
) {
try {
const pubSub = app.get(PubSubService);
await pubSub.consumeMessages();
} catch (error) {
console.error(error);
cancellationToken.cancel();
}
}
function configureApplicationStop(cancellationToken: CancellationTokenService) {
process.on('SIGINT', () => {
cancellationToken.cancel();
console.log('Stopping application...');
});
}
main();
app.module.ts
import { Module } from '@nestjs/common';
import { PubSubService } from '@/services/pubSub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Module({
imports: [],
providers: [CancellationTokenService, PubSubService],
})
export class AppModule {}
pubsub.service.ts
import { Injectable } from '@nestjs/common';
import { PubSub, Subscription } from '@google-cloud/pubsub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Injectable()
export class PubSubService {
pubsub: PubSub;
subscription: Subscription;
constructor(private readonly cancellationToken: CancellationTokenService) {
this.pubsub = new PubSub();
this.initialize();
console.log('PubsubService');
}
async initialize(): Promise<void> {
this.subscription = this.initializeSubscription();
}
initializeSubscription(): Subscription {
return this.pubsub.subscription('my-subsription');
}
async consumeMessages(): Promise<void> {
if (this.cancellationToken.isCancelled) {
console.log('cancellationToken.isCancelled');
this.subscription.close();
}
console.log('cancellationToken.isCancelled');
if (!this.subscription.isOpen) this.subscription.open();
this.subscription.on('message', (message) => {
if (this.cancellationToken.isCancelled)
console.log('cancellationToken.isCancelled consuming messages');
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
message.nack();
//message.ack();
});
}
}
cancellationToken.service.ts
import { Injectable, Scope } from '@nestjs/common';
@Injectable({ scope: Scope.DEFAULT })
export class CancellationTokenService {
private _isCancelled: boolean;
constructor() {
this._isCancelled = false;
}
get isCancelled(): boolean {
return this._isCancelled;
}
cancel(): void {
this._isCancelled = true;
}
reset(): void {
this._isCancelled = false;
}
}
我需要在触发SIGTERM时停止使用消息。
我不知道为什么它不工作,因为显然应用程序正在记录Stopping application...
我目前是nest js的新手,所以我有点迷失了,如果这是正确的方法。
1条答案
按热度按时间mbskvtky1#
也许这不是最好的方法,但现在这解决了这个问题。
我已经在我的app.module.ts中创建了cancellationTokenService的示例和自定义提供程序