我正在尝试从第三方AWSS3存储桶读取文件,该文件采用. gz
格式。我需要处理文件中的数据并将文件上传到我们自己的S3存储桶。
为了读取文件,我从S3. getBucket创建了一个readStream,如下所示:
const fileStream = externalS3.getObject({Bucket: <bucket-name>, Key: <key>}).createReadStream();
为了使代码更高效,我计划使用相同的fileStream
来处理内容和上传到我们自己的S3。我有下面的代码,它不会将文件上传到内部S3存储桶。
import Stream from "stream";
const uploadStream = fileStream.pipe(new stream.PassThrough());
const readStream = fileStream.pipe(new stream.PassThrough());
await internalS3.upload({Bucket:<bucket-name>, Key: <key>, Body: uploadStream})
.on("httpUploadProgress", progress => {console.log(progress)})
.on("error", error => {console.log(error)})
.promise();
readStream.pipe(createGunzip())
.on("error", err =>{console.log(err)})
.pipe(JSONStream.parse())
.on("data", data => {console.log(data)});
但是,下面的代码成功地将文件上传到内部s3存储桶。
const uploadStream = fileStream.pipe(new stream.PassThrough());
await internalS3.upload({Bucket:<bucket-name>, Key: <key>, Body: uploadStream})
.on("httpUploadProgress", progress => {console.log(progress)})
.on("error", error => {console.log(error)})
.promise();
我到底做错了什么?
注意:如果我使用单独的fileStream
来上传和读取数据,它可以正常工作。但是,我需要使用相同的fileStream来实现这一点。
如OP所述,您尝试上传到S3的文件大小相对较大(约1 GB)。这里正在创建两个流,通过管道传输单个fileStream
:
const uploadStream = fileStream.pipe(new stream.PassThrough());
const readStream = fileStream.pipe(new stream.PassThrough());
虽然readStream
上的操作耗时较少,但上传流
负责将文件上传到远程位置,在这种情况下是S3,通过网络,这需要相对更多的时间。这也意味着readStream
正在以更高的速率从fileStream
中提取/请求数据。当readStream
完成时,fileStream
已经被使用并且。上传
对aws sdk
的调用挂起。看这个问题。
您可以通过使用此库来同步两个不同的流来修复它。可以在此处找到如何实现这一点的示例。