提问者:小点点

使用AWSS3 getObject中的Read Stream读取并上传到不同的存储桶


我正在尝试从第三方AWSS3存储桶读取文件,该文件采用. gz格式。我需要处理文件中的数据并将文件上传到我们自己的S3存储桶。

为了读取文件,我从S3. getBucket创建了一个readStream,如下所示:

const fileStream = externalS3.getObject({Bucket: <bucket-name>, Key: <key>}).createReadStream();

为了使代码更高效,我计划使用相同的fileStream来处理内容和上传到我们自己的S3。我有下面的代码,它不会将文件上传到内部S3存储桶。

import Stream from "stream";

const uploadStream = fileStream.pipe(new stream.PassThrough());
const readStream = fileStream.pipe(new stream.PassThrough());

await internalS3.upload({Bucket:<bucket-name>, Key: <key>, Body: uploadStream})
.on("httpUploadProgress", progress => {console.log(progress)})
.on("error", error => {console.log(error)})
.promise();

readStream.pipe(createGunzip())
.on("error", err =>{console.log(err)})
.pipe(JSONStream.parse())
.on("data", data => {console.log(data)});

但是,下面的代码成功地将文件上传到内部s3存储桶。

const uploadStream = fileStream.pipe(new stream.PassThrough());


await internalS3.upload({Bucket:<bucket-name>, Key: <key>, Body: uploadStream})
.on("httpUploadProgress", progress => {console.log(progress)})
.on("error", error => {console.log(error)})
.promise();

我到底做错了什么?

注意:如果我使用单独的fileStream来上传和读取数据,它可以正常工作。但是,我需要使用相同的fileStream来实现这一点。


共1个答案

匿名用户

如OP所述,您尝试上传到S3的文件大小相对较大(约1 GB)。这里正在创建两个流,通过管道传输单个fileStream

const uploadStream = fileStream.pipe(new stream.PassThrough());
const readStream = fileStream.pipe(new stream.PassThrough());

虽然readStream上的操作耗时较少,但上传流负责将文件上传到远程位置,在这种情况下是S3,通过网络,这需要相对更多的时间。这也意味着readStream正在以更高的速率从fileStream中提取/请求数据。当readStream完成时,fileStream已经被使用并且。上传aws sdk的调用挂起。看这个问题。

您可以通过使用此库来同步两个不同的流来修复它。可以在此处找到如何实现这一点的示例。