如何在NodeJS中创建一对多转换流?

nlejzf6q  于 12个月前  发布在  Node.js
关注(0)|答案(2)|浏览(141)

我尝试使用NodeJS流来替换ETL过程。我尝试编写的Transform流接受数据集,并基于配置数据,将输出一个或多个记录。换句话说,如果它读取阅读100000条记录,转换可以结束写入100000-400000条记录的任何位置。_transform方法只允许其回调被调用一次,所以我试图弄清楚如何在一个输入对象中输出多个对象。
我看了看双工,但我看到的每一个例子都是用它作为一个双向流,而我肯定希望我的流是单向的(或者我可能只是不明白他们是如何工作的)。

gkn4icbw

gkn4icbw1#

回调只能被调用一次,但是.push方法是发出数据的方法,并且可以根据需要在_transform方法中调用多次。示例:

class MyTransform extends Transform {
  _transform(chunk, enc, next) {
    const arrayFromChunk = chunk.split(',');

    arrayFromChunk.forEach(piece => {
      // this.push is what will emit readable data, can be called as often
      // as needed.
      this.push(piece); 
    });

    next(); // next can only be called once.
  }
}

字符串
文档在这里:https://nodejs.org/docs/latest-v18.x/api/stream.html#stream_implementing_a_transform_stream

9udxz4iz

9udxz4iz2#

NodeJS流非常适合ETL工作,但是虽然非常强大,但它们也非常复杂,当你从头开始时很容易迷失方向-正如你已经经历过的那样。最后我创建了gulp-etl,它在后台使用流。如果您希望一个传入记录生成多个记录,它看起来像这样:

const handleLines = require('gulp-etl-handlelines').handlelines;

const linehandler = (lineObj, context) => {
    let recsToReturn = [];
 
    // return null to remove this line
    if (!lineObj.record || lineObj.record["TestValue"] == 'illegalValue') 
          {return null}
 
    // return incoming record
    recsToReturn.push(lineObj);

    // logic to create new record
    if (lineObj.record.needsDuplication) {
        // clone newRec from lineObj
        let newRec = {...lineObj, record:...lineObj.record};
        // change new record as needed
        newRec.record.UniqueField = "newValue";
        recsToReturn.push(newRec);
    }
 
    // return the record(s)
    return recsToReturn;
}

exports.default = function() {
    return src('data/100kRecs.ndjson', { buffer:false /* use streaming mode */})
    // pipe the files through our handlelines plugin
    .pipe(handlelines({}, { transformCallback: linehandler }))
    .pipe(dest('output/'));
}

字符串
我们在幕后使用了Transform流,但这一切都被抽象掉了;您可以获得好处,而不必进入流实现的杂草-除非您想编写自己的插件。另外,您可以使用许多现有的插件。

相关问题