提问者:小点点

通过google cloud函数在DataFlow作业中GCS. csv


我遵循本指南创建一个google cloud函数,该函数在GCS桶触发器期间启动DataFlow作业。我的问题是围绕模板和inout文件。我会在我的数据流管道中使用这部分来通过TextIO.read获取源数据(GCScsv),但我不确定如何格式化这部分管道以考虑来自桶触发器的文件。我会有像“ReadTable”这样的东西吗?

p = beam.Pipeline(options=options)
raw_values = (
            p 
            | "ReadTable" >> TextIO.read().from("gs://bucket/file.csv")
            | "custFunc" >> beam.Map(CallAPI)
            | "writeTable" >> WriteToBigQuery('newtablw', project='project1', 
                                               dataset='test', schema=table_schema,
                                               write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
            )

共1个答案

匿名用户

使用Dataflow模板,您可以创建在运行模板时提供的运行时参数。

定义所需的模板,例如file_in和file_out。然后,当GCS事件触发Cloud Functions时,您可以获取事件数据以提取存储桶和文件名,连接它们并将它们作为数据流参数file_in提供。