我注意到NodeJS的管道(通过“stream”包)的使用一直在默默地崩溃,而不知道为什么。以下是最小复制:
const { pipeline, Readable, Transform } = require('stream');
const util = require('util');
const pipelineAsync = util.promisify(pipeline);
class Stream extends Readable {
constructor(options) {
super(options);
this._rowsToSend = [];
const nbRows = 100;
for (let i = 0; i < nbRows; ++i) {
this._rowsToSend.push(`row n°${i + 1}/${nbRows}`);
}
this.on("error", (error) => {
console.log("STREAM ERROR", error);
});
}
_read(size) {
const row = this._rowsToSend.shift();
if (row) {
console.log(`SENDING: ${row}`);
this.push(this.readableObjectMode ? row : Buffer.from(JSON.stringify(row), this.readableEncoding || undefined));
}
else {
console.log(`END OF STREAM`);
this.push(null);
}
}
_destroy(error, callback) {
this._rowsToSend = [];
console.log("STREAM DESTROYED");
callback(error);
}
}
class LoggerTransform extends Transform {
constructor(options) {
super(options);
this.on('error', (error) => {
console.log("TRANSFORM ERROR: ", error);
});
}
_transform(chunk, encoding, callback) {
console.log(`RECEIVED: ${chunk}`);
callback(null, chunk);
}
_flush(callback) {
console.log("TRANSFORM FLUSHED");
callback();
}
}
(async () => {
try {
await pipelineAsync(
new Stream({ objectMode: true }),
new LoggerTransform({ objectMode: true })
);
console.log("SUCCESS");
}
catch (error) {
console.log("FAILURE", error);
}
})();
我一直在使用Windows(10/11)或Linux(Debian 11/Ubuntu 22.04)的Node 10+(在10、14和18上尝试)中重现这个问题。
1条答案
按热度按时间dxxyhpgq1#
Writable
对象是必需的,但在管道末尾缺少。一个简单的解决方案是使用
fs
: