javascript NodeJS -流管道静默崩溃

f87krz0w  于 2023-09-29  发布在  Java
关注(0)|答案(1)|浏览(83)

我注意到NodeJS的管道(通过“stream”包)的使用一直在默默地崩溃,而不知道为什么。以下是最小复制:

const { pipeline, Readable, Transform } = require('stream');
const util = require('util');

const pipelineAsync = util.promisify(pipeline);

class Stream extends Readable {

  constructor(options) {
    super(options);
    this._rowsToSend = [];
    const nbRows = 100;
    for (let i = 0; i < nbRows; ++i) {
      this._rowsToSend.push(`row n°${i + 1}/${nbRows}`);
    }
    this.on("error", (error) => {
      console.log("STREAM ERROR", error);
    });
  }

  _read(size) {
    const row = this._rowsToSend.shift();
    if (row) {
      console.log(`SENDING: ${row}`);
      this.push(this.readableObjectMode ? row : Buffer.from(JSON.stringify(row), this.readableEncoding || undefined));
    }
    else {
      console.log(`END OF STREAM`);
      this.push(null);
    }
  }

  _destroy(error, callback) {
    this._rowsToSend = [];
    console.log("STREAM DESTROYED");
    callback(error);
  }

}

class LoggerTransform extends Transform {

  constructor(options) {
    super(options);
    this.on('error', (error) => {
      console.log("TRANSFORM ERROR: ", error);
    });
  }

  _transform(chunk, encoding, callback) {
    console.log(`RECEIVED: ${chunk}`);
    callback(null, chunk);
  }

  _flush(callback) {
    console.log("TRANSFORM FLUSHED");
    callback();
  }

}

(async () => {
  try {
    await pipelineAsync(
      new Stream({ objectMode: true }),
      new LoggerTransform({ objectMode: true })
    );
    console.log("SUCCESS");
  }
  catch (error) {
    console.log("FAILURE", error);
  }
})();

我一直在使用Windows(10/11)或Linux(Debian 11/Ubuntu 22.04)的Node 10+(在10、14和18上尝试)中重现这个问题。

dxxyhpgq

dxxyhpgq1#

Writable对象是必需的,但在管道末尾缺少。
一个简单的解决方案是使用fs

const fs = require('fs');
/* ... */
await pipelineAsync(
  new Stream({ objectMode: true }),
  new LoggerTransform({ objectMode: true }),
  fs.createWriteStream("/dev/null")
);

相关问题