提问者:小点点

Beam/Dataflow读取Parquet文件并为每条记录添加文件名/路径


我正在使用Apache BeamPythonSDK我正在尝试使用apache_beam.io. parquetio从Parquet文件中读取数据,但我也想将文件名(或路径)添加到数据中,因为它也包含数据。我在这里查看了建议的模式,并读到Parquetio类似于fileio,但它似乎没有实现允许查看文件并将其添加到聚会的功能。

有人想出一个好方法来实现这一点吗?

谢谢!


共1个答案

匿名用户

如果文件数量不多,您可以在通过IO读取文件之前获取所有文件。

import glob

filelist = glob.glob('/tmp/*.parquet')
p = beam.Pipeline()

class PairWithFile(beam.DoFn):
    def __init__(self, filename):
        self._filename = filename

    def process(self, e):
        yield (self._filename, e)

file_with_records = [
    (p 
     | 'Read %s' % (file) >> beam.io.ReadFromParquet(file)
     | 'Pair %s' % (file) >> beam.ParDo(PairWithFile(file)))
    for file in filelist 
] | beam.Flatten()

然后你的PCollection看起来像这样: