typescript 如何在RxJS订阅方法中等待

3hvapo4f  于 2023-01-18  发布在  TypeScript
关注(0)|答案(9)|浏览(144)

在一个RxJS主题的订阅回调中,我想在一个async函数上使用await。下面是一个代码示例, typescript 转发器抱怨说:
错误:(131,21)TS2304:找不到名称“wait”。

async ngOnInit() {
  this.subscriber = dateSubscription.subscribe((date: Date) => {
    let dbKey = await this._someService.saveToDatabase(someObject);
    // wait for db write to finish before evaluating the next code
    // ... some other code here
  });
}

通常我在尝试在非async函数中调用wait时会看到这种情况。我是否需要进行订阅回调async还是我做错了?函数saveToDatabaseasync,并返回解析到写入的数据库主键的承诺。

ss2ws0br

ss2ws0br1#

您可以直接将异步签名添加到订阅中的匿名函数调用中

this.subscriber = dateSubscription.subscribe(async (date: Date) => {
    let dbKey = await this._someService.saveToDatabase(someObject);
    // wait for db write to finish before evaluating the next code
    // ... some other code here
  });
kgqe7b3p

kgqe7b3p2#

您不需要使用await,也不需要将Promise转换为Observable
请参见Ben Lesh的此Tweet

下面是一个模拟saveToDatabase函数的例子:

const { Observable } = Rx;

const saveToDatabase = (date) =>
  new Promise(resolve =>
    setTimeout(() =>
      resolve(`${date} has been saved to the database`),
      1000));

const date$ = Observable.of(new Date()).delay(1000);

date$
  .do(x => console.log(`date received, trying to save it to database ...`))
  .switchMap(date => saveToDatabase(date))
  .do(console.log)
  .subscribe();

输出:

ddarikpa

ddarikpa3#

**更新:**发现easy for one time承诺只需在订阅者(blog)前面使用toPromise。因此,对于上述情况,它将是这样的:

const title = await this.translate.get('MYBOOK-PAGE.PAGE_TITLE').toPromise();

**老办法:**这是我解决这个问题的方法

const title = await new Promise<string>(resolve => 
  this.translate.get('MYBOOK-PAGE.PAGE_TITLE')
   .subscribe(translated => {
     resolve(translated)
   }));

我现在要做的是把可观察性变成承诺
注:这里唯一的问题是这是一次性显示,即如果你订阅一次,你将无法再次访问它。适合我在这里分享。

7eumitmz

7eumitmz4#

您不能直接await观察值,但可以awaitPromise

更新

.toPromise()方法在RxJS 8中被 * 弃用 *,因此您可以使用lastValueFrom()firstValueFrom()

import { lastValueFrom } from 'rxjs';
this.categories = await lastValueFrom(categories$);

感谢罗伯特·伦德尔的评论。

RxJS v8之前

您可以简单地对Observables订阅使用.toPromise()方法。

async ngOnInit() {
  const date = await dateSubscription.toPromise();      
  let dbKey = await this._someService.saveToDatabase(someObject);
}

当你awaitdateSubscription的承诺时,你会得到一个Date对象,然后你可以继续执行下一行,这使得你的代码阅读更有顺序性。
有些人认为Angular 不会等待ngOnInit完成,它没有选择。请看一下从这里给定的TypeScript生成的JavaScriptngOnInit将调用内部管理和执行底层状态机的等待器(生成器)。Angular对此没有任何控制。它只是希望调用该方法。

06odsfpq

06odsfpq5#

2022年的Rxjs:使用pipeconcatMap
最后,我在这里寻找在监听下一个发布项之前等待处理昂贵项的能力,我不得不将subscribe处理程序中的async代码修改为按照this other similar SO question使用pipeconcatMap
旧代码,在处理新项之前等待对项的昂贵调用(因此在我的例子中存在大量争用条件):

myObservable$.subscribe(async (item) => { await expensiveFoo(item); })

新代码,在处理下一个项目之前等待昂贵的函数。

let subscription = myObservable$.pipe(
    concatMap(async (item) => { await expensiveFoo(item); })
).subscribe(
    (item) => simpleFoo(item),// can also handle sync code here if you want 
    (error) => handleError(error), 
    () => handleComplete()
);

在处理完可观察对象之前,一定要保留对subscription的引用(它不是订阅者)。

4xy9mtcn

4xy9mtcn6#

我也有类似的等待响应的需求,而不是在Observable实现中使用await或setTimeout()方法,现在更干净的方法是在完成订阅之前使用RxJS内置的interval()方法。
在服务中尝试此操作:

import { interval } from 'rxjs';
...

// inside your method
const source = interval(100);
    source.subscribe(x => {
        subject.next(CONNECTIONS_DATA);
        subject.complete();
});
return subject;

RxJS文档:https://rxjs-dev.firebaseapp.com/guide/subject#reference-counting
希望有点帮助,如果有人试图这样做最近。

cngwdvgl

cngwdvgl7#

我刚刚编写了一个TaskQueue,以确保订阅中的异步函数可以逐个执行。
查看演示:Code Sandbox

const ob$ = new Subject()
const subOb$ = new Subject()

async function asyncFunc() {
  await new Promise((resolve) => setTimeout(resolve, Math.random(1) * 1000))
}

// Without [TaskQueue]
ob$.subscribe(x => {
  subOb$.next(x)
})
subOb$.current.pipe(switchMap(asyncFunc)).subscribe()  // <- My main goal is to make sure the [asyncFunc] is executed asyncronously.

// With [TaskQueue]
const taskQueue = new TaskQueue(subOb$)
ob$.subscribe(x => {
  taskQueue.next(x)
  // or
  taskQueue.nextAsync(x)
})
taskQueue.subscribe(asyncFunc)
vybvopom

vybvopom8#

这些答案似乎都没有充分回答OP最初的问题。下面是我使用async/await的简单解决方案,这些回调函数是使用IIFEs时可观察到的。

import { Observable } from 'rxjs';

const someAsyncFunction = async (message: string) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(message);
    }, 1000);
  });
}

const myObservable = new Observable((subscriber) => {
  (async () => {
    const message1 = await someAsyncFunction('hello world');
    subscriber.next(message1);
    subscriber.next(message1);
  })()
    .then(
      () => subscriber.complete(),
      (err) => subscriber.error(err)
    )
});

myObservable.subscribe({
  next: async (message1) => {
      console.log(message1);
      const message2 = await someAsyncFunction('goodbye world');
      console.log(message2);
      console.log('done');
  },
  error: (err) => console.log(err),
  complete: () => console.log('complete')
});

输出:

hello world
hello world
complete
goodbye world
done
goodbye world
done

注意complete回调函数在next回调函数完成执行之前被调用。当next函数包含异步活动并且无法避免时,rxjs observable函数就是这样工作的。如果你需要在next函数完成之后调用complete函数,你需要创建自己的Observable实现或等价的实现。

nxagd54h

nxagd54h9#

async ngOnInit() { }是不正确的签名,因为这是Angular定义OnInit接口的方式。它应该返回void

export interface OnInit { ngOnInit(): void; }

如果在dateSubscription之后有任何处理承诺,可以使用Observable.fromPromise,如

dateSubscription
 .flatMap(x=>
      Observable.defer(Observable.fromObservable(this._someService.saveToDatabase(someObject)))
   ).subscribe()

相关问题