NodeJS 如何使用AsyncLocalStorage进行观察?

neekobn8  于 2022-12-18  发布在  Node.js
关注(0)|答案(3)|浏览(120)

我想在NestJs拦截器中使用AsyncLocalStorage:

export interface CallHandler<T = any> {
    handle(): Observable<T>;
}
export interface NestInterceptor<T = any, R = any> {
    intercept(context: ExecutionContext, next: CallHandler<T>): Observable<R> | Promise<Observable<R>>;
}

拦截器函数获取一个nextCallHandler,它返回一个Observable
在这种情况下,我不能使用run(run回调将在callHandler.handle()可观察对象完成之前立即退出):

intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const asyncLocalStorage = new AsyncLocalStorage();
    const myStore = {  some: 'data'};
    return asyncLocalStorage.run(myStore, () => callHandler.handle());
  }

请参见中断的复制-示例
我想出的解决办法是这样的:

const localStorage = new AsyncLocalStorage();

export class MyInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const resource = new AsyncResource('AsyncLocalStorage', { requireManualDestroy: true });
    const myStore = { some: 'data' };

    localStorage.enterWith(myStore);
    return callHandler.handle().pipe(
      finalize(() => resource.emitDestroy())
    );
  }
}


参见working replit example
这看起来很好用,但我不确定这是否真的正确--而且看起来很混乱,容易出错。
1.这是正确的吗?
1.有没有更好/更干净的方法来处理这个问题?

0tdrvxhp

0tdrvxhp1#

下面是我想到的解决方案。我对问题的理解是,您需要run函数来接收一个回调函数,该回调函数将完全封装处理程序的执行,然而,intercept函数预计返回一个尚未触发的可观察对象。这意味着,如果您将可观察对象本身封装在run回调函数中,它还没有被触发。
我的解决方案是返回一个新的observable,当它被触发时,将负责触发(即订阅)调用处理程序本身,因此,我们在run调用中创建的promise可以完全封装handle函数及其异步回调。
以下是独立函数中的一般功能,以便您可以一起查看:

intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
    return new Observable((subscribe) => {
        asyncStorage.run({}, () => new Promise(resolve => {
            next.handle().subscribe(
                result => {
                    subscribe.next(result);
                    subscribe.complete();
                    resolve();
                },
                error => {
                    subscribe.error(err);
                    resolve();
                }
            );
        }));
    });
}

接下来,我把这个概念和集成到我的拦截器下面。

export class RequestContextInterceptor implements NestInterceptor {
    constructor(
        private readonly requestContext: RequestContext,
        private readonly localStorage: AsyncLocalStorage<RequestContextData>
    ) {}

    intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
        const contextData = this.requestContext.buildContextData(context);
        return new Observable((subscribe) => {
            void this.localStorage.run(contextData, () => this.runHandler(next, subscribe));
        });
    }

    private runHandler(next: CallHandler<any>, subscribe: Subscriber<any>): Promise<void> {
        return new Promise<void>((resolve) => {
            next.handle().subscribe(
                (result) => {
                    subscribe.next(result);
                    subscribe.complete();
                    resolve();
                },
                (err) => {
                    subscribe.error(err);
                    resolve();
                }
            );
        });
    }
}

值得注意的是,在run调用过程中创建的Promise没有拒绝路径,这是故意的,错误会传递给 Package 承诺的可观察对象,这意味着外部可观察对象仍然会成功或出错,这取决于内部可观察对象的行为,然而, Package 内部可观察对象的承诺总是会解决问题。

moiiocjp

moiiocjp2#

以下是cls-hooks的解决方案:

return new Observable(observer => {
  namespace.runAndReturn(async () => {
    namespace.set("some", "data")   
    next.handle()
        .subscribe(
          res => observer.next(res), 
          error => observer.error(error),
          () => observer.complete()
        )
  })
})
k7fdbhmy

k7fdbhmy3#

以下是我们目前解决这个问题的方法:

  • 我们创建一个可观察的,它将简单地把所有的辐射传递给callHandler
  • 重要的是,我们localStorage.run方法中订阅
const localStorage = new AsyncLocalStorage();

export class MyInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const myStore = { some: 'data' };

    return new Observable((subscriber) => {
      const subscription = localStorage.run(myStore, () => {
        /**
         * - run the handler function in the run callback, so that myStore is set
         * - subscribe to the handler and pass all emissions of the callHandler to our subscriber
         */
        return callHandler.handle().subscribe(subscriber);
      });
      /**
       * return an unsubscribe method
       */
      return () => subscription.unsubscribe();
    });
  }
}

相关问题