我需要能够将沿袭追溯到单个镶木地板文件,并且能够执行批量加载,例如如果在数据流中发现缺陷,则重播几年的镶木地板文件。
经过多次尝试,以下方法适用于批量加载,其中options. input是RuntimeValueProvider,而SplitFn只是产生
str.split()
:
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
mainPipe = p \
| 'CSV of URIs' >> beam.Create([options.input]) \
| 'Split URIs into records' >> beam.ParDo(SplitFn(',')) \
| "Read Parquet" >> beam.io.parquetio.ReadAllFromParquet(columns=[k for k in fields.keys()])
不幸的是beam.io. parquetio.ReadAllFromParque不会说每条记录来自哪个文件,ReadFromParque
也不会说,parquetio
的唯一其他功能。
除了离开谷歌云数据流或教团队Java,我能做些什么将许多拼花文件一次加载到BigQuery中,并知道每个记录来自哪个文件?
鉴于目前的API我没有看到预先制定的解决方案。虽然你可以通过以下方式解决问题: