我遵循本指南创建一个google cloud函数,该函数在GCS桶触发器期间启动DataFlow作业。我的问题是围绕模板和inout文件。我会在我的数据流管道中使用这部分来通过TextIO.read
获取源数据(GCScsv),但我不确定如何格式化这部分管道以考虑来自桶触发器的文件。我会有像“ReadTable”这样的东西吗?
p = beam.Pipeline(options=options)
raw_values = (
p
| "ReadTable" >> TextIO.read().from("gs://bucket/file.csv")
| "custFunc" >> beam.Map(CallAPI)
| "writeTable" >> WriteToBigQuery('newtablw', project='project1',
dataset='test', schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
使用Dataflow模板,您可以创建在运行模板时提供的运行时参数。
定义所需的模板,例如file_in和file_out。然后,当GCS事件触发Cloud Functions时,您可以获取事件数据以提取存储桶和文件名,连接它们并将它们作为数据流参数file_in提供。