如何使用nodejs中的Streams块过滤S3上存储为.json.gz的对象数组

gzszwxb4  于 2023-04-05  发布在  Node.js
关注(0)|答案(1)|浏览(85)

假设存储在s3中的json数组是个人详细信息,例如:[{"name":'A', "lastName":'A', age: 18}, {"name":'B', "lastName":'B', age: 20}, ... ]这个文件可能非常大,我想优化内存使用,并使用流来过滤数据,而不是将整个文件加载到内存中并过滤它。我不确定我是否完全理解“objectMode”在这里的工作原理。
我尝试了以下方法,但失败了,因为控制台日志将块打印为大小相等的字符串字节,而不是一组对象,尽管使用了objectMode: true

const filteredData = [];
const filterTransform = new Transform({
      objectMode: true,
      transform(chunk, _, callback) {
          console.log("chunk : \n"+chunk);
          try {
               const filteredData = chunk.map((item: any) => ({
                  name: item.name,
                  lastName: item.lastName,
             }));
             filteredData.push(JSON.stringify(filteredData));
           } catch (err) {
                callback(err);
           }
           callback();
       },
});
const client = getS3Client();
const command = new GetObjectCommand({
    Bucket: bucket,
    Key: key,
});
const data:GetObjectCommandOutput= await client.send(command);
const readStream = (dataSingle.Body! as Readable)
      .pipe(zlib.createGunzip())
      .pipe(filterTransform)

示例输出为块1 "{name:'A', lastName:'"块2 "A'}, {name:'B', lastN"块3,依此类推。
但我希望:区块1 \[{"name":'A', "lastName":'A'}, {"name":'B', "lastName":'B'}\]区块2 \[{"name":'C', "lastName":'C'}, {"name":'D', "lastName":'D'}\]
...如何将块计数为对象列表而不是字节?

bvjxkvbb

bvjxkvbb1#

要使用Node.js中的Streams块过滤S3上存储为.json.gz的对象数组,您需要确保流处于对象模式,并且流发出的块是完整的对象。
在您的代码中,看起来您试图使用Transform流转换数据,但没有正确处理块。在对象模式下处理流时,每个块都应该表示一个完整的对象。如果对象被拆分为多个块,则需要在处理它之前缓冲块,直到您有一个完整的对象。
下面是一个如何修改代码以正确过滤数据的示例:

const filterTransform = new Transform({
  objectMode: true,
  transform(chunk, _, callback) {
    try {
      const data = JSON.parse(chunk);
      const filteredData = data.map((item) => ({
        name: item.name,
        lastName: item.lastName,
      }));
      callback(null, JSON.stringify(filteredData));
    } catch (err) {
      callback(err);
    }
  },
});

const client = getS3Client();
const command = new GetObjectCommand({
  Bucket: bucket,
  Key: key,
});

const data = await client.send(command);
const readStream = data.Body!.pipe(zlib.createGunzip()).pipe(filterTransform);

// consume the filtered data
readStream.on("data", (chunk) => {
  console.log(chunk);
});

readStream.on("error", (err) => {
  console.error(err);
});

readStream.on("end", () => {
  console.log("Done");
});

例如,让我们尝试使用Transform流来解析和过滤数据。我们设置objectMode:true表示流应该处理完整的对象。在transform方法中,我们解析传入的JSON字符串并过滤数据。然后我们将过滤后的数据作为JSON字符串调用回调函数。
当使用流时,我们必须监听数据事件,它将把过滤后的数据作为完整的对象发出,我们也可以根据需要将过滤后的数据写入文件或其他流
注意
在上面的例子中,我们假设流发出的每个块都是一个完整的JSON对象。如果块不是完整的对象,那么在处理它之前,你需要缓冲块,直到你有一个完整的对象。

相关问题