typescript RxJS无限循环从数组与延迟基于值

dgjrabp2  于 2023-10-22  发布在  TypeScript
关注(0)|答案(2)|浏览(121)

我想无休止地循环一个数组,其中包含每个都有一个指定的持续时间是活跃的。一旦这个持续时间过去了,下一个条目应该被发出。当我限制从流中获取的数据的数量时,我能够让它工作,但是如果我想让它无休止地运行,它会在RangeError: Maximum call stack size exceeded中失败。
用例是有一组固定的页面,每个页面都有自己的可见时间。一旦显示了最后一页,它应该从第一页重新开始,依此类推。
这是我创建这个流的尝试:

import { from, of, zip } from "rxjs";
import { map, startWith, share, concatMap, delay, repeat, take } from "rxjs/operators";

interface Entry {
  name: string;
  duration: number;
}
const data: Array<Entry> = [
  {"name": "foo", "duration": 1000},
  {"name": "bar", "duration": 4000},
  {"name": "bla", "duration": 2000}
];

const source = from(data).pipe(
  repeat(),
  take(10),
  share()
);
const ticker = source.pipe(
  map(v => v.duration),
  startWith(0),
);

zip(source, ticker).pipe(
  concatMap(([val, dur]) => of(val).pipe(delay(dur))),
).subscribe(x => console.log(x, new Date()));

它的工作原理是因为source流中的take(10)。删除它会导致RangeError
预期的输出应该是这样的:

{ name: 'foo', duration: 1000 } 2023-10-03T20:54:37.284Z
{ name: 'bar', duration: 4000 } 2023-10-03T20:54:38.289Z
{ name: 'bla', duration: 2000 } 2023-10-03T20:54:42.294Z
{ name: 'foo', duration: 1000 } 2023-10-03T20:54:44.297Z
{ name: 'bar', duration: 4000 } 2023-10-03T20:54:45.297Z
{ name: 'bla', duration: 2000 } 2023-10-03T20:54:49.303Z
{ name: 'foo', duration: 1000 } 2023-10-03T20:54:51.307Z
{ name: 'bar', duration: 4000 } 2023-10-03T20:54:52.310Z
{ name: 'bla', duration: 2000 } 2023-10-03T20:54:56.316Z
{ name: 'foo', duration: 1000 } 2023-10-03T20:54:58.320Z
...continue until unsubscribed...

是否有更好的方法从重复的source发出,同时仍然保持基于最后发出的Entry的延迟?

5ssjco0h

5ssjco0h1#

正如@Andrei所指出的,堆栈问题来自repeat()在到达concatMap之前创建了无限数量的条目。
因为你已经在Entry中有了持续时间,你可以简化你的解决方案,像这样去掉代码:

const source = from(data).pipe(
  concatMap((entry) => of(entry).pipe(delay(entry.duration))),
  repeat(),
  share()
); 

source.subscribe(x => console.log(x, new Date()));

或者,如果你想发出条目,然后等待:

concatMap((entry) => concat(of(entry),timer(entry.duration).pipe(ignoreElements()))),
46qrfjad

46qrfjad2#

原因是repeat()基本上创建了无限数量的事件,这些事件被推送到concatMap回调之后的操作符。Rxjs流在概念上是基于推送的,并且不太可能知道你发出的事件是如何处理或延迟的。这就是为什么你基本上必须在重复的步骤上限制事件。
重复操作符的一个有用特性是repeat({delay: time})。仅在源的总持续时间之后重复源

const sumDur = data.reduce((sum, item) => sum + item.duration, 0);
const source = from(data).pipe(
  repeat({delay: sumDur}),
  take(10),
  share()
);

或者,您可以尝试仅在所有延迟之后的流末尾将解决方案修改为repeat

相关问题