javascript RXJS,在继续工作流程之前,是否有方法在操作员内部等待订阅结束?

7gyucuyw  于 2023-01-11  发布在  Java
关注(0)|答案(1)|浏览(101)

所以,我是RXJS的新手,在来这里问这个问题之前,我已经检查了很多堆栈溢出和文档,但是我发现很难让我的逻辑工作。
我有一个Observable,它将获取一个文档集合并返回它们,我使用管道操作符进行一些更改,比如使用map操作符更改对象,到目前为止,一切都很好。
问题就在这里。之后,我需要为每个文档运行一个“http请求”,以获取关于它们的特定数据(“标签”)。http请求当然也是作为Observable发出的,需要订阅才能获取数据。然而,订阅需要一些时间,并且之后得到的对象没有所需的数据。

let myFunction.pipe(
      // mapping to add missing data needed for the front-end
      map((results) => ({
        ...results,
        documents: results._embedded.documents.map((document) => ({
          ...document,
          tags: []
        })),
      })),
// mapping to loop through each document, and use the observable to get the tags with the document id
      map((results) => {

        let documents = results.documents.map((document: Document) => {
          // get Tags for each document
          let tagsToReturn = []
          this.getDocumentTags(document.id)
            .pipe(
       // map function to return only the ids for each document, and not the complete tag object
              map((tagsArray) => {
                const modifiedTagsArray = tagsArray.map((tagObject: any) => {
                  if (tagObject !== undefined) {
                    return tagObject.id
                  }
                })
                return modifiedTagsArray
              })
            )
              // the subscription to "actually" get the tags
            .subscribe((tagsArray: number[]) => {
              // Here the tags are found, but the latter code is executed first
              // document.tags = tagsArray
              tagsToReturn = tagsArray
            })

          // console.log(JSON.stringify(document))
          // Here the tags are not found yet
          console.log(JSON.stringify(tagsToReturn))

          return { ...document, tags: tagsToReturn }
        })

       // I then, normally return the new documents with the tags for each document, but it is empty because the subscribe didn't return yet.
        return {
          _links: results._links,
          page: results.page,
          documents: documents,
        }
      }),
      map((results) => {
        results.documents.forEach((doc) => {
          return this.addObservablesToDocument(doc)
        })
        return results
      })
    )

我已经尝试了一些解决方案与switchmap,forkjoin,concat...等,但它没有工作,或者我没有找到正确的方式来使用它们。这就是为什么我问是否有一种方法来停止或其他方式来处理这个问题。
我尝试使用不同的运算符,如:mergemap,concat,switchmap到swich到新的请求,但是之后,我不能有全局对象。
I mostly tried to replicate/readapt this in some ways

j0pj023g

j0pj023g1#

通过结合使用mergemapforkjoin,我能够复制您正在寻找的内容。
我不知道该如何解释,因为我也不是Rxjs方面的Maven,但我使用了以下代码:this stackoverflow answer that i adapted
我的理解是,当在管道流中使用mergeMap时,您要确保返回到那里的所有内容都将通过调用“subscribe()"执行,然后mergeMap返回一个forkJoin,这是每个文档标记可观察到的
希望这能帮上忙

.pipe(
      // mapping to add missing data needed for the front-end
      map((results) => ({
        ...results,
        documents: results._embedded.documents.map((document) => ({
          ...document,
          tags: []
        })),
      })),

  /********  Added Code *********/
      mergeMap((result: ResultsNew<Document>) => {
        let allTags = result._embedded.documents.map((document) =>
          this.getDocumentTags(document.id).pipe(
            map((tagsArray) => tagsArray.map((tagObject: any) => tagObject.id))
          )
        )
        return forkJoin(...allTags).pipe(
          map((idDataArray) => {
            result._embedded.documents.forEach((eachDocument, index) => {
              eachDocument.tags = idDataArray[index]
            })
            
            return {
              page: result.page,
              _links: result._links,
              documents: result._embedded.documents,
            }
          })
        )
      }),
        /********  Added Code *********/

      map((results) => {
        results.documents.forEach((doc) => {
          return this.addObservablesToDocument(doc)
        })
        return results
      })
    )

相关问题