使用Lambda Node从S3上的文件创建S3上的zip文件

eiee3dmh  于 2023-08-04  发布在  Node.js
关注(0)|答案(6)|浏览(130)

我需要创建一个Zip文件,其中包含位于我的s3存储桶中的文件(视频和图像)。
在使用我下面的代码时,问题是我很快就达到了Lambda的内存限制。

async.eachLimit(files, 10, function(file, next) {
    var params = {
        Bucket: bucket, // bucket name
        Key: file.key
    };
    s3.getObject(params, function(err, data) {
        if (err) {
            console.log('file', file.key);
            console.log('get image files err',err, err.stack); // an error occurred
        } else {
            console.log('file', file.key);
            zip.file(file.key, data.Body);
            next();
        }
    });
}, 
function(err) {
    if (err) {
        console.log('err', err);
    } else {
        console.log('zip', zip);
        content = zip.generateNodeStream({
            type: 'nodebuffer',
            streamFiles:true
        });
        var params = {
            Bucket: bucket, // name of dest bucket
            Key: 'zipped/images.zip',
            Body: content
        };
        s3.upload(params, function(err, data) {
            if (err) {
                console.log('upload zip to s3 err',err, err.stack); // an error occurred
            } else {
                console.log(data); // successful response
            }
        });
    }
});

字符串

  • 使用Lambda是否可行,或者我应该考虑其他方法?
  • 是否可以动态写入压缩的zip文件,从而在一定程度上消除内存问题,或者我需要在压缩之前收集文件?

如果你能帮忙的话,我将不胜感激。

gab6jxml

gab6jxml1#

好吧,我今天必须这么做而且很有效。直接缓冲区到流,不涉及磁盘。因此,内存或磁盘限制在这里不会成为问题:

'use strict';

const AWS = require("aws-sdk");
AWS.config.update( { region: "eu-west-1" } );
const s3 = new AWS.S3( { apiVersion: '2006-03-01'} );

const   _archiver = require('archiver');

//This returns us a stream.. consider it as a real pipe sending fluid to S3 bucket.. Don't forget it
const streamTo = (_bucket, _key) => {
	var stream = require('stream');
	var _pass = new stream.PassThrough();
	s3.upload( { Bucket: _bucket, Key: _key, Body: _pass }, (_err, _data) => { /*...Handle Errors Here*/ } );
	return _pass;
};
      
exports.handler = async (_req, _ctx, _cb) => {
	var _keys = ['list of your file keys in s3'];
	
    var _list = await Promise.all(_keys.map(_key => new Promise((_resolve, _reject) => {
            s3.getObject({Bucket:'bucket-name', Key:_key})
                .then(_data => _resolve( { data: _data.Body, name: `${_key.split('/').pop()}` } ));
        }
    ))).catch(_err => { throw new Error(_err) } );

    await new Promise((_resolve, _reject) => { 
        var _myStream = streamTo('bucket-name', 'fileName.zip');		//Now we instantiate that pipe...
        var _archive = _archiver('zip');
        _archive.on('error', err => { throw new Error(err); } );
        
        //Your promise gets resolved when the fluid stops running... so that's when you get to close and resolve
        _myStream.on('close', _resolve);
        _myStream.on('end', _resolve);
        _myStream.on('error', _reject);
        
        _archive.pipe(_myStream);			//Pass that pipe to _archive so it can push the fluid straigh down to S3 bucket
        _list.forEach(_itm => _archive.append(_itm.data, { name: _itm.name } ) );		//And then we start adding files to it
        _archive.finalize();				//Tell is, that's all we want to add. Then when it finishes, the promise will resolve in one of those events up there
    }).catch(_err => { throw new Error(_err) } );
    
    _cb(null, { } );		//Handle response back to server
};

字符串

bqjvbblv

bqjvbblv2#

我根据@iocoker格式化了代码。

主入口

// index.js

'use strict';
const S3Zip = require('./s3-zip')

const params = {
  files: [
    {
      fileName: '1.jpg',
      key: 'key1.JPG'
    },
    {
      fileName: '2.jpg',
      key: 'key2.JPG'
    }
  ],
  zippedFileKey: 'zipped-file-key.zip'
}

exports.handler = async event => {
  const s3Zip = new S3Zip(params);
  await s3Zip.process();

  return {
    statusCode: 200,
    body: JSON.stringify(
      {
        message: 'Zip file successfully!'
      }
    )
  };

}

字符串

Zip文件实用程序

// s3-zip.js

'use strict';
const fs = require('fs');
const AWS = require("aws-sdk");

const Archiver = require('archiver');
const Stream = require('stream');

const https = require('https');
const sslAgent = new https.Agent({
  KeepAlive: true,
  rejectUnauthorized: true
});
sslAgent.setMaxListeners(0);
AWS.config.update({
  httpOptions: {
    agent: sslAgent,
  },
  region: 'us-east-1'
});

module.exports = class S3Zip {
  constructor(params, bucketName = 'default-bucket') {
    this.params = params;
    this.BucketName = bucketName;
  }

  async process() {
    const { params, BucketName } = this;
    const s3 = new AWS.S3({ apiVersion: '2006-03-01', params: { Bucket: BucketName } });

    // create readstreams for all the output files and store them
    const createReadStream = fs.createReadStream;
    const s3FileDwnldStreams = params.files.map(item => {
      const stream = s3.getObject({ Key: item.key }).createReadStream();
      return {
        stream,
        fileName: item.fileName
      }
    });

    const streamPassThrough = new Stream.PassThrough();
    // Create a zip archive using streamPassThrough style for the linking request in s3bucket
    const uploadParams = {
      ACL: 'private',
      Body: streamPassThrough,
      ContentType: 'application/zip',
      Key: params.zippedFileKey
    };

    const s3Upload = s3.upload(uploadParams, (err, data) => {
      if (err) {
        console.error('upload err', err)
      } else {
        console.log('upload data', data);
      }
    });

    s3Upload.on('httpUploadProgress', progress => {
      // console.log(progress); // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
    });

    // create the archiver
    const archive = Archiver('zip', {
      zlib: { level: 0 }
    });
    archive.on('error', (error) => {
      throw new Error(`${error.name} ${error.code} ${error.message} ${error.path} ${error.stack}`);
    });

    // connect the archiver to upload streamPassThrough and pipe all the download streams to it
    await new Promise((resolve, reject) => {
      console.log("Starting upload of the output Files Zip Archive");

      streamPassThrough.on('close', resolve());
      streamPassThrough.on('end', resolve());
      streamPassThrough.on('error', reject());

      archive.pipe(streamPassThrough);
      s3FileDwnldStreams.forEach((s3FileDwnldStream) => {
        archive.append(s3FileDwnldStream.stream, { name: s3FileDwnldStream.fileName })
      });
      archive.finalize();

    }).catch((error) => {
      throw new Error(`${error.code} ${error.message} ${error.data}`);
    });

    // Finally wait for the uploader to finish
    await s3Upload.promise();

  }
}

k3bvogb1

k3bvogb13#

其他的解决方案对于没有那么多文件(少于~60)是很好的。如果他们处理更多的文件,他们只是退出到没有任何错误。这是因为它们打开了太多的流。
此解决方案的灵感来自https://gist.github.com/amiantos/16bacc9ed742c91151fcf1a41012445e
这是一个可行的解决方案,即使有许多文件(+300)也能很好地工作,并返回一个预签名的URL到包含文件的zip。
主λ:

const AWS = require('aws-sdk');
const S3 = new AWS.S3({
  apiVersion: '2006-03-01',
  signatureVersion: 'v4',
  httpOptions: {
    timeout: 300000 // 5min Should Match Lambda function timeout
  }
});
const archiver = require('archiver');
import stream from 'stream';

const UPLOAD_BUCKET_NAME = "my-s3-bucket";
const URL_EXPIRE_TIME = 5*60;

export async function getZipSignedUrl(event) {
  const prefix = `uploads/id123123/}`;   //replace this with your S3 prefix
  let files = ["12314123.png", "56787567.png"]  //replace this with your files

  if (files.length == 0) {
    console.log("No files to zip");
    return result(404, "No pictures to download");
  }
  console.log("Files to zip: ", files);

  try {
    files = files.map(file => {
        return {
            fileName: file,
            key: prefix + '/' + file,
            type: "file"
        };
    });
    const destinationKey = prefix + '/' + 'uploads.zip'
    console.log("files: ", files);
    console.log("destinationKey: ", destinationKey);

    await streamToZipInS3(files, destinationKey);
    const presignedUrl = await getSignedUrl(UPLOAD_BUCKET_NAME, destinationKey, URL_EXPIRE_TIME, "uploads.zip");
    console.log("presignedUrl: ", presignedUrl);

    if (!presignedUrl) {
      return result(500, null);
    }
    return result(200, presignedUrl);
  }
  catch(error) {
    console.error(`Error: ${error}`);
    return result(500, null);
  }
}

字符串
辅助函数:

export function result(code, message) {
  return {
    statusCode: code,
    body: JSON.stringify(
      {
        message: message
      }
    )
  }
}

export async function streamToZipInS3(files, destinationKey) {
  await new Promise(async (resolve, reject) => {
    var zipStream = streamTo(UPLOAD_BUCKET_NAME, destinationKey, resolve);
    zipStream.on("error", reject);

    var archive = archiver("zip");
    archive.on("error", err => {
      throw new Error(err);
    });
    archive.pipe(zipStream);

    for (const file of files) {
      if (file["type"] == "file") {
        archive.append(getStream(UPLOAD_BUCKET_NAME, file["key"]), {
          name: file["fileName"]
        });
      }
    }
    archive.finalize();
  })
  .catch(err => {
    console.log(err);
    throw new Error(err);
  });
}

function streamTo(bucket, key, resolve) {
  var passthrough = new stream.PassThrough();
  S3.upload(
    {
      Bucket: bucket,
      Key: key,
      Body: passthrough,
      ContentType: "application/zip",
      ServerSideEncryption: "AES256"
    },
    (err, data) => {
      if (err) {
        console.error('Error while uploading zip')
        throw new Error(err);
        reject(err)
        return
      }
      console.log('Zip uploaded')
      resolve()
    }
  ).on("httpUploadProgress", progress => {
    console.log(progress)
  });
  return passthrough;
}

function getStream(bucket, key) {
  let streamCreated = false;
  const passThroughStream = new stream.PassThrough();

  passThroughStream.on("newListener", event => {
    if (!streamCreated && event == "data") {
      const s3Stream = S3
        .getObject({ Bucket: bucket, Key: key })
        .createReadStream();
      s3Stream
        .on("error", err => passThroughStream.emit("error", err))
        .pipe(passThroughStream);

      streamCreated = true;
    }
  });

  return passThroughStream;
}

export async function getSignedUrl(bucket: string, key: string, expires: number, downloadFilename?: string): Promise<string> {
    const exists = await objectExists(bucket, key);
    if (!exists) {
        console.info(`Object ${bucket}/${key} does not exists`);
        return null
    }

    let params = {
        Bucket: bucket,
        Key: key,
        Expires: expires,
    };
    if (downloadFilename) {
        params['ResponseContentDisposition'] = `inline; filename="${encodeURIComponent(downloadFilename)}"`; 
    }
    
    try {
        const url = s3.getSignedUrl('getObject', params);
        return url;
    } catch (err) {
        console.error(`Unable to get URL for ${bucket}/${key}`, err);
        return null;
    }
};

6ljaweal

6ljaweal4#

使用流可能很棘手,因为我不确定如何将多个流通过管道传输到一个对象中。我已经用标准文件对象做过几次了。这是一个多步骤的过程,而且相当快。请记住,Lambda在Linux中运行,因此您可以使用所有Linux资源,包括system /tmp目录。
1.在/tmp中创建一个子目录,调用“transient”或任何适合您的命令
1.使用s3.getObject()并将文件对象写入/tmp/transient
1.使用GLOB包从/tmp/transient生成路径数组[]
1.循环数组并压缩.addLocalFile(array[i]);
1.“);files.zip

klr1opcd

klr1opcd5#

我使用了类似的方法,但我面临的问题是,生成的ZIP文件中的一些文件没有正确的大小(和相应的数据)。这段代码可以管理的文件大小有什么限制吗?在我的情况下,我压缩大文件(几个大于1GB)和数据总量可能达到10 GB。
我没有收到任何错误/警告消息,所以似乎一切正常。
你知道会发生什么吗?

k2fxgqgv

k2fxgqgv6#

您可以使用adm-zip,它允许您直接在磁盘或内存缓冲区中处理zip文件。与node-archiver库相比,它的使用也更简单,node-archiver库也有一个未寻址的issue
TypeScript代码:

import AdmZip from "adm-zip";

import { GetObjectCommand, GetObjectCommandOutput, PutObjectCommand, PutObjectCommandInput } from "@aws-sdk/client-s3";

export async function uploadZipFile(fileKeysToDownload: string[], bucket: string, uploadFileKey: string): Promise<void> {
    
  // create a new zip file using "adm-zip"
  let zipFile = new AdmZip();

  // Download the existing files in S3 using GET API
  // use parallel fetch in your code, for loop is shown here for simplicity
  // invoke GET APIs for each element in fileKeysToDownload
  // i = 0 -> (fileKeysToDownload.length - 1) 
  const data = await getObject(fileKeysToDownload[i], bucket);
  const byteArray = await data!.transformToByteArray();

  // add the byte arrays to the newly created zip file
  zipFile.addFile(fileKeysToDownload[i], Buffer.from(byteArray));

  // Convert this zip file to a byte array 
  const outputBody = zip.toBuffer();

  // upload zip file to S3 using the PUT API
  await putObject(outputBody, uploadFileKey);
};

async function getObject(key: string, bucket: string){
  const command: GetObjectCommand = new GetObjectCommand({Bucket: bucket, Key: key});
  const response: GetObjectCommandOutput = await s3.send(command);
  return response.Body;
}

async function putObject(content: Buffer, key: string, bucket: string){
  const input: PutObjectCommandInput = {
    Body: content,
    Bucket: bucket,
    Key: key,
    ContentType: "application/zip"
  }
  const response = await s3.send(
    new PutObjectCommand(input)
  );
}

字符串
使用Lambda是否可行,或者我应该考虑其他方法?“是的,有可能。
是否可以动态写入压缩的zip文件,从而在一定程度上消除内存问题,或者我需要在压缩之前收集文件?->是的,请使用上述方法使用adm-zip

相关问题