我正在编写一个从BigQuery读取并进行一些转换的Dataflow作业。
data = (
pipeline
| beam.io.ReadFromBigQuery(query='''
SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
| beam.Map(print)
)
但是我的要求是只有在收到PubSub Topic的通知后才从BigQuery读取数据。只有当收到以下消息时,上述DataFlow作业才应该开始从BigQuery读取数据。如果是不同的作业id或不同的状态,则不应该执行任何操作。
PubSub Message : {'job_id':101, 'status': 'Success'}
这部分有什么帮助吗?
这很简单,代码如下所示
pubsub_msg = (
pipeline
| beam.io.gcp.pubsub.ReadFromPubSub(topic=my_topic, subscription=my_subscription)
)
bigquery_data = (
pubsub_msg
| beam.Filter(lambda msg: msg['job_id']==101) # you might want to use a more sophisticated filter condition
| beam.io.ReadFromBigQuery(query='''
SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
)
bigquery_data | beam.Map(print)
但是,如果您这样做,您将有一个流式数据流作业正在运行(无限期地,或者直到您取消该作业),因为使用ReadFromPubSub
会自动生成一个流式作业。因此,当消息到达PubSub时,这不会启动一个数据流作业,而是一个作业已经在运行并侦听主题以进行某些操作。
如果您真的想触发Dataflow批处理作业,我建议使用Dataflow模板,并使用侦听PubSub主题的Cloud Function启动此模板。过滤的逻辑将在此CloudFunction中(作为简单的if
条件)。
我最终使用了云函数,添加了过滤逻辑,并从那里启动数据流。发现下面的链接很有用。如何使用云函数触发数据流?(PythonSDK)