NodeJS 流管道中途停止

ecbunoof  于 12个月前  发布在  Node.js
关注(0)|答案(2)|浏览(124)

所以,我想在流中执行一个流程。起初,它只需要很少的数据/迭代就可以工作,但如果涉及更多的数据,管道就会中途停止。
下面是我的代码的样子。
基本上,对于这个例子,我想处理大约200个数据。
但只有30个数据被成功处理。它停止后。没有错误信息,我不知道为什么。

import { pipeline } from "stream/promises"

async function* func() {
  for (let i = 0; i < 200; i++) {
    console.log(`func ${i}`)
    yield i
  }
}
async function* func2(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
;    console.log(`func2 ${i}`)
    yield i
  }
}
async function* func3(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func3 ${i}`)
    yield i
  }
}

async function main() {
  await pipeline(
    func,
    func2,
    func3
  )
}

main();

字符串

预期输出:

迭代完成200

实际产量:

迭代在30处停止

nfs0ujit

nfs0ujit1#

我在你的代码中添加了一个writableStream,它可以像你预期的那样工作:

import { pipeline } from 'stream/promises';
import { Writable } from 'stream';

async function* func() {
  for (let i = 0; i < 200; i++) {
    console.log(`func ${i}`)
    yield i;
  }
}

async function* func2(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func2 ${i}`);
    yield i;
  }
}
async function* func3(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func3 ${i}`);
    yield i.toString();
  }
}

const writableStream = new Writable({
  write(chunk, encoding, callback) {
    callback();
  }
});

async function main() {
  try {
    await pipeline(
      func,
      func2,
      func3,
      writableStream
    );
    console.log('Pipeline finished successfully');
  } catch (e) {
    console.error('Pipeline failed', e);
  }
}

main();

字符串

xj3cbfub

xj3cbfub2#

经过不断的调试,我找到了解决方案,它实际上很简单,哈哈。
解决方案是删除最后一个函数(即PipelineDestination)上的“yield”。
我相信发生的事情是,所有以前的数据只是在流中积累,因为没有消耗它们。

import { pipeline } from "stream/promises"

async function* func() {
  for (let i = 0; i < 200; i++) {
    console.log(`func ${i}`)
    yield i
  }
}
async function* func2(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
;    console.log(`func2 ${i}`)
    yield i
  }
}
async function* func3(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func3 ${i}`)
    // yield i <---- I removed this!
  }
}

async function main() {
  await pipeline(
    func,
    func2,
    func3
  )
}

main();

字符串

相关问题