我正在使用Apache BeamPythonSDK我正在尝试使用apache_beam.io. parquetio
从Parquet文件中读取数据,但我也想将文件名(或路径)添加到数据中,因为它也包含数据。我在这里查看了建议的模式,并读到Parquetio类似于fileio,但它似乎没有实现允许查看文件并将其添加到聚会的功能。
有人想出一个好方法来实现这一点吗?
谢谢!
如果文件数量不多,您可以在通过IO读取文件之前获取所有文件。
import glob
filelist = glob.glob('/tmp/*.parquet')
p = beam.Pipeline()
class PairWithFile(beam.DoFn):
def __init__(self, filename):
self._filename = filename
def process(self, e):
yield (self._filename, e)
file_with_records = [
(p
| 'Read %s' % (file) >> beam.io.ReadFromParquet(file)
| 'Pair %s' % (file) >> beam.ParDo(PairWithFile(file)))
for file in filelist
] | beam.Flatten()
然后你的PCollection看起来像这样: