我在GCS有csv(gzip压缩)文件。我想读取这些文件并将数据发送到BigQuery。
标题信息可以更改(尽管我提前知道所有列),所以仅仅删除标题是不够的,不知何故,我需要读取第一行并将列信息附加到剩余行。
怎么可能?
我首先认为我必须实现像这篇文章这样的自定义源代码。
用数据流读取CSV头
但是有了这个解决方案,我不确定如何首先解压缩Gzip。我可以以某种方式使用withCompressionType
像TextIO
吗?(我在python类中找到了一个参数compression_type
,但是我使用的是Java,在JavaFileBasedSource
类中找不到类似的参数。)
我也觉得这有点过分,因为它使文件无法拆分(尽管在我的情况下没关系)。
或者我可以使用GoogleCloudStorage并在我的main()
函数中直接读取文件及其第一行,然后继续执行管道。
但它也很麻烦,所以我想确认是否有任何最佳实践(数据流方式)在使用数据流中的标头时读取csv文件?
如果我理解你试图正确完成的事情,SideInput(文档,示例)可能是这里的答案。它将允许标题可用于文件的每一行。
一般的想法是将标头作为一个单独的PCollsionView发出,并将其用作每行处理的SideInput。您可以使用SideOutput(文档)对文件进行单次传递来实现这一点
如果我没看错你的问题,听起来你的标题内容在不同的文件中有所不同。如果是这样,你可以使用View. asMap
来保存每个文件的标题映射。不幸的是,目前本地不支持跟踪正在读取的当前文件名,但这篇文章中讨论了一些工作循环。