IndexedDB 如何创建一个返回Observable的方法,该方法发出需要依次执行的2个Promises的结果?

von4xj4u  于 2022-12-09  发布在  IndexedDB
关注(0)|答案(1)|浏览(213)

我问了一个问题
Is Observable from chained promises equivalent of observables created with from and chained with concatMap?
在完全错误的前提下。似乎我的两个解决方案都与我的意图无关。
我创建了一个返回Observable的方法,并调用了2个返回Promise的方法。我尝试了2种方法:

public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
    return from(this.db.selectionItemInfos.clear().then(() => {
      return this.db.selectionItemInfos.bulkAdd(itemInfos);
    }));
  }

  public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
    const clear$ = from(this.db.selectionItemInfos.clear());
    const bulkAdd$ = from(this.db.selectionItemInfos.bulkAdd(itemInfos));

    return clear$.pipe(concatMap(() => bulkAdd$))
  }

用途是:

myService.setItemInfos(itemInfos).subsribe(count => {
  console.log(`Cleared the table 1st and then added ${count} new items`);
});

我从两个版本中想到:

  1. bulkAdd开始时,表清除操作执行完毕
    1.当bulkAdd完成时,我从subscribe中获取计数
    究竟应该如何做?或者可以做到吗?
rggaifut

rggaifut1#

这是(从我在这里可以告诉),我会怎么做。
一般来说,defer(或任何高阶运算符)是从promise创建observable的一种更好的方法。Defer允许你将promise的急切求值语义转换为observable的懒惰求值语义。
然后所有通常的可观测算子等将如预期的那样起作用。

public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
  const clear$ = defer(() => this.db.selectionItemInfos.clear());
  const bulkAdd$ = defer(() => this.db.selectionItemInfos.bulkAdd(itemInfos));

  return concat(clear$, bulkAdd$);
}

更新1:

所以我想我可能知道你想要什么。这不是一个真正的惯用的RxJS,因为它是一个声明性的,命令式的代码的交错混合。即使这样,这应该工作吗?我还没有完全测试它,但一些修补,我认为这应该做你想要的。
肯定有更好的方法来完成同样的事情,但是如果没有看到你所追求的更大的图景,就很难说了。

interface Tagged<T> {
  payload: T,
  tag: number
}

class abitraryClass{

  private setItemInfoSub: Subject<Tagged<IItemInfo[]>>;
  private processItemInfo: Observable<Tagged<number>>;
  private itemInfoTag = 0;

  constructor(){
    this.setItemInfoSub = new Subject<Tagged<IItemInfo[]>>();
    this.processItemInfo = this.setItemInfoSub.pipe(
      concatMap(({tag, payload: itemInfos}) => this.db.selectionItemInfos.clear().pipe(
        ignoreElements(),
        concatWith(defer(() => this.db.selectionItemInfos.bulkAdd(itemInfos))),
        map(response => ({
          payload: response,
          tag
        }))
      )),
      shareReplay(1)
    );
    // Make the processing pipeline live at all times.
    this.processItemInfo.subscribe();
  }

  public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
    const myTag = this.itemInfoTag++;

    this.setItemInfoSub.next({
      payload: itemInfos,
      tag: myTag
    });

    return this.processItemInfo.pipe(
      filter(({tag}) => tag == myTag),
      map(({payload}) => payload)
    );
  }
}

相关问题