typescript 使用RxJS将值添加到流中:最大调用栈大小

cld4siwp  于 2023-03-04  发布在  TypeScript
关注(0)|答案(2)|浏览(156)

我有WebSocket服务器、Apollo客户机/服务器和RxJS来检测我的环境中的变化,还有一个订阅该websocket的服务:
//会话服务

private sessionsSubject = new BehaviorSubject<Array<Session> | null>(null);
  public sessions$ = this.sessionsSubject.asObservable();

  constructor(private apollo: Apollo) {this.subscribeToSessions()}

  public subscribeToSessions(): void {
    this.apollo.subscribe<SessionCreated>({
      query: sessionOperations.CREATE_SESSION_SUBSCRIPTION
    }).subscribe(async ({data}) => {
      this.sessions$.subscribe({
        next: (sessions) => this.setSessions([...sessions??[], data!.sessionCreated]),
        complete: () => console.log('complete')
      })
    })
  }

  public setSessions(sessions: Array<Session>): void {
    this.sessionsSubject.next(sessions)
  }
}

如果添加了一个新的会话,则会调用我订阅的sessionOperations.CREATE_SESSION_SUBSCRIPTION查询。在订阅中,我想用新创建的会话更新会话。
但是 * 我认为 * 这会导致一个无限循环。因为这也会更新sessions$可观察值,它会再次运行会话的添加。

qybjjes1

qybjjes11#

你处在一个无限循环中,因为在订阅中,你调用了订阅的可观察对象上的next,并导致Maximum call stack size exceeded
this.setSessions([...sessions??[], data!.sessionCreated])应该替换对行为主体的整个预订,使得仅将外部预订推入行为主体而不触发无限循环。

const { BehaviorSubject } = rxjs;

const sessionsSubject = new BehaviorSubject(null);
const sessions$ = sessionsSubject.asObservable();

sessions$.subscribe({
  next: (sessions) => { setSessions([...(sessions||[]), 'Some Session object']); },
  // This causes the observable to emit which is what triggers the infinite loop
  complete: () => console.log('complete')
  // This is a behavior subject it will only complete if you do it manually
});

function setSessions(sessions) {
  sessionsSubject.next(sessions);
}

setSessions(['Some Session object']);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
34gzjxbg

34gzjxbg2#

这里的答案〉How to avoid Maximum call stack size exceeded in Observable?与我相关:

public subscribeToSessions(): void {
  this.apollo.subscribe<SessionCreated>({
    query: sessionOperations.CREATE_SESSION_SUBSCRIPTION
  }).subscribe(async ({data}) => {
    this.sessions$
    .pipe(
      take(1),
    ).subscribe({
      next: (sessions) => data ? this.setSessions([...sessions??[], data.sessionCreated]) : null,
    })
  })
}

添加take(1)解决了我的问题。
take操作符允许我们指定在取消订阅之前我们想从Observable接收多少个值,这意味着当我们接收到指定数量的值时,take会自动取消订阅!

相关问题