NodeJS NestJs -使用GCP Pubsub的工作人员

ymdaylpp  于 2023-04-11  发布在  Node.js
关注(0)|答案(1)|浏览(145)

我需要开发一个worker来消费GCP Pub/Sub消息。
下面是我使用的代码:

main.ts

import { INestApplication, INestApplicationContext } from '@nestjs/common';
import { CancellationTokenService } from '@/services/cancellationToken';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '@/app.module';
import { PubSubService } from '@/services/pubSub';
async function main() {
  const app = await NestFactory.createApplicationContext(AppModule);
  const cancellationToken = app.get(CancellationTokenService);
  configureApplicationStop(cancellationToken);
  await run(app, cancellationToken);
  while (!cancellationToken.isCancelled) {
    console.log('Microservice is running...');
    await new Promise((resolve) => setTimeout(resolve, 1000));
  }
  await app.close();
}


async function run(
  app: INestApplicationContext,
  cancellationToken: CancellationTokenService,
) {
  try {
    const pubSub = app.get(PubSubService);
    await pubSub.consumeMessages();
  } catch (error) {
    console.error(error);
    cancellationToken.cancel();
  }
}

function configureApplicationStop(cancellationToken: CancellationTokenService) {
  process.on('SIGINT', () => {
    cancellationToken.cancel();
    console.log('Stopping application...');
  });
}
main();

app.module.ts

import { Module } from '@nestjs/common';
import { PubSubService } from '@/services/pubSub';

import { CancellationTokenService } from '@/services/cancellationToken';
@Module({
  imports: [],
  providers: [CancellationTokenService, PubSubService],
})
export class AppModule {}

pubsub.service.ts

import { Injectable } from '@nestjs/common';
import { PubSub, Subscription } from '@google-cloud/pubsub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Injectable()
export class PubSubService {
  pubsub: PubSub;
  subscription: Subscription;
  constructor(private readonly cancellationToken: CancellationTokenService) {
    this.pubsub = new PubSub();
    this.initialize();
    console.log('PubsubService');
  }
  async initialize(): Promise<void> {
    this.subscription = this.initializeSubscription();
  }
  initializeSubscription(): Subscription {
    return this.pubsub.subscription('my-subsription');
  }
  async consumeMessages(): Promise<void> {
    if (this.cancellationToken.isCancelled) {
      console.log('cancellationToken.isCancelled');
      this.subscription.close();
    }
    console.log('cancellationToken.isCancelled');
    if (!this.subscription.isOpen) this.subscription.open();
    this.subscription.on('message', (message) => {
      if (this.cancellationToken.isCancelled)
        console.log('cancellationToken.isCancelled consuming messages');
      console.log(`Received message ${message.id}:`);
      console.log(`\tData: ${message.data}`);
      console.log(`\tAttributes: ${message.attributes}`);
      message.nack();
      //message.ack();
    });
  }
}

cancellationToken.service.ts

import { Injectable, Scope } from '@nestjs/common';
@Injectable({ scope: Scope.DEFAULT })
export class CancellationTokenService {
  private _isCancelled: boolean;
  constructor() {
    this._isCancelled = false;
  }
  get isCancelled(): boolean {
    return this._isCancelled;
  }
  cancel(): void {
    this._isCancelled = true;
  }
  reset(): void {
    this._isCancelled = false;
  }
}

我需要在触发SIGTERM时停止使用消息。
我不知道为什么它不工作,因为显然应用程序正在记录Stopping application...
我目前是nest js的新手,所以我有点迷失了,如果这是正确的方法。

mbskvtky

mbskvtky1#

也许这不是最好的方法,但现在这解决了这个问题。
我已经在我的app.module.ts中创建了cancellationTokenService的示例和自定义提供程序

import { CancellationTokenService } from '@/services/cancellationToken';
const cancellationTokenService = new CancellationTokenService();
//...
const cancellationTokenProvider: Provider = {
  provide: CancellationTokenService,
  useFactory: () => cancellationTokenService,
};
@Module({
  imports: [],
  providers: [cancellationTokenProvider, PubSubService],
})

相关问题