提问者:小点点

数据流作业GCS到Pub/sub最大批量大小


我正在使用默认的数据流模板GCS到云存储中的发布/订阅输入文件,大小为300MB,每个文件有2-3数百万行。

启动数据流批处理作业时,会引发以下错误

来自worker的错误消息:javax . naming . sizelimitexceeded exception:Pub/Sub消息大小(1089680070)超过了最大批处理大小(7500000)org . Apache . beam . SDK . io . GCP . pubsub . pubsubio$writer . process element(pubsubio . Java:1160)

从留档:Pub/Sub一批最多接受1,000条消息,一批大小不能超过10 MB。

这是否意味着我必须将输入文件拆分为 10MB 块或 1000 条消息才能发布?

将如此大的文件(每个300MB)加载到pubsub的建议方法是什么?

提前感谢您的帮助。


共1个答案

匿名用户

这是数据流方面的一个已知限制,此时存在一个增加批量大小的功能请求。使用1按钮并开始跟踪问题的进展。

我建议你查看这篇文章,里面有一个建议的解决方法。重要的是要考虑到这个变通方法意味着修改云存储文本到发布/订阅模板来实现这里提到的定制转换。

另一方面,您可以尝试创建云函数,以便在数据流处理之前分割文件,我想是这样的:

  1. 创建一个“暂存”存储桶来上传您的大文件。
  2. 编写一个Cloud Function来拆分您的文件并将小块写入另一个存储桶。您可以尝试使用filesplit Python包来执行此操作。
  3. 每次使用Google Cloud Storage触发器在“暂存”存储桶中上传新文件时,都会触发Cloud Function运行。
  4. 将文件拆分为小块后,使用相同的云功能从“暂存”存储桶中删除大文件以避免额外费用。
  5. 使用数据流模板Cloud Storage Text to Pub/Sub来处理第二个存储桶的小块。