我有几个. txt文件中的数据JSON加载到谷歌BigQuery表。随着文本文件中的列,我需要为每一行插入文件名和当前时间戳。它在GCP数据流中,Python3.7
我使用GCSFileSystem. match和metadata_list访问了包含文件路径和大小的文件数据。
我相信我需要让管道代码在循环中运行,将文件路径传递给ReadFromText,并调用FileNameReadFunction ParDo。
(p
| "read from file" >> ReadFromText(known_args.input)
| "parse" >> beam.Map(json.loads)
| "Add FileName" >> beam.ParDo(AddFilenamesFn(), GCSFilePath)
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(known_args.output,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
我按照Dataflow/apache光束中的步骤-在传入模式时如何访问当前文件名?但我不能让它完全工作。
感谢任何帮助。
您可以使用textio. ReadFromTextWellFilename而不是ReadFromText。这将生成(filename,line)元组的PCollection。
要在输出json记录中包含文件和时间戳,您可以将“解析”行更改为
| "parse" >> beam.map(lambda (file, line): {
**json.loads(line),
"filename": file,
"timestamp": datetime.now()})