javascript 如何将节点可读流转换为RX可观察流

kulphzqa  于 2022-12-28  发布在  Java
关注(0)|答案(6)|浏览(152)

如果我有一个Node js流,例如来自process.stdinfs.createReadStream,我如何使用RxJs5将其转换为RxJs可观察流?
我看到RxJs-Node有一个fromReadableStream方法,但看起来它已经有近一年没有更新了。

xwmevbvl

xwmevbvl1#

对于任何想了解这一点的人,我按照Mark的建议,为rxjs 5修改了rx-node fromStream实现。

import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).share();
}

请注意,它本质上破坏了流的所有反压功能。Observables'是一种推送技术。所有输入块都将被读取并尽快推送到观察器。根据您的情况,这可能不是最佳解决方案。

aurhwmvo

aurhwmvo2#

由于节点v11.14.0流支持for await https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator
这意味着您可以将stream传递给from()操作符。
底层rxjs(v7.x.x)将调用fromAsyncIterable()fromAsyncIterable()将返回可观察

e4yzc0pl

e4yzc0pl3#

以下内容适用于v4和v5(* 免责声明 * 未经测试):

fromStream: function (stream, finishEventName, dataEventName) {
    stream.pause();

    finishEventName || (finishEventName = 'end');
    dataEventName || (dataEventName = 'data');

    return Observable.create(function (observer) {

      // This is the "next" event
      const data$ = Observable.fromEvent(stream, dataEventName);

      // Map this into an error event
      const error$ = Observable.fromEvent(stream, 'error')
        .flatMap(err => Observable.throw(err));

      // Shut down the stream
      const complete$ = Observable.fromEvent(stream, finishEventName);

      // Put it all together and subscribe
      const sub = data$
        .merge(error$)
        .takeUntil(complete$)
        .subscribe(observer);

      // Start the underlying node stream
      stream.resume();

      // Return a handle to destroy the stream
      return sub;
    })

    // Avoid recreating the stream on duplicate subscriptions
    .share();
  },
esyap4oy

esyap4oy4#

上面的答案是可行的,尽管不支持反压。如果你尝试用createReadStream读取一个大文件,它们会在内存中读取整个文件。
下面是我的支持背压的实现:rxjs-stream

guz6ccqo

guz6ccqo5#

RxJs-Node实现是基于RxJs 4的,但可以移植到RxJs 5,而不需要做很多工作https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52

hsgswve4

hsgswve46#

最近来这里的人(RxJS 7 & Node 18+)应该使用下面的代码。
RxJS已经被更新来处理类似流的对象。当你把一个ReadStream传递给RxJS时,它会测试它是否是ReadableStreamLike,然后把它变成一个AsyncGenerator

import { from } from 'rxjs';

const file = fs.createReadStream(fileName);

const file$ = from(file).subscribe({
  next:  (dat) => { ... },
  error: (err) => { ... },
  complete: () => { ... }
});

相关问题