fromStream: function (stream, finishEventName, dataEventName) {
stream.pause();
finishEventName || (finishEventName = 'end');
dataEventName || (dataEventName = 'data');
return Observable.create(function (observer) {
// This is the "next" event
const data$ = Observable.fromEvent(stream, dataEventName);
// Map this into an error event
const error$ = Observable.fromEvent(stream, 'error')
.flatMap(err => Observable.throw(err));
// Shut down the stream
const complete$ = Observable.fromEvent(stream, finishEventName);
// Put it all together and subscribe
const sub = data$
.merge(error$)
.takeUntil(complete$)
.subscribe(observer);
// Start the underlying node stream
stream.resume();
// Return a handle to destroy the stream
return sub;
})
// Avoid recreating the stream on duplicate subscriptions
.share();
},
6条答案
按热度按时间xwmevbvl1#
对于任何想了解这一点的人,我按照Mark的建议,为rxjs 5修改了rx-node
fromStream
实现。请注意,它本质上破坏了流的所有反压功能。Observables'是一种推送技术。所有输入块都将被读取并尽快推送到观察器。根据您的情况,这可能不是最佳解决方案。
aurhwmvo2#
由于节点v11.14.0流支持
for await
https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator这意味着您可以将stream传递给
from()
操作符。底层rxjs(v7.x.x)将调用
fromAsyncIterable()
,fromAsyncIterable()
将返回可观察e4yzc0pl3#
以下内容适用于v4和v5(* 免责声明 * 未经测试):
esyap4oy4#
上面的答案是可行的,尽管不支持反压。如果你尝试用createReadStream读取一个大文件,它们会在内存中读取整个文件。
下面是我的支持背压的实现:rxjs-stream
guz6ccqo5#
RxJs-Node实现是基于RxJs 4的,但可以移植到RxJs 5,而不需要做很多工作https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
hsgswve46#
最近来这里的人(RxJS 7 & Node 18+)应该使用下面的代码。
RxJS已经被更新来处理类似流的对象。当你把一个ReadStream传递给RxJS时,它会测试它是否是
ReadableStreamLike
,然后把它变成一个AsyncGenerator
。