提问者:小点点

基于PubSub通知启动的数据流作业-Python


我正在编写一个从BigQuery读取并进行一些转换的Dataflow作业。

data = (
    pipeline
    | beam.io.ReadFromBigQuery(query='''
    SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
    | beam.Map(print)
)

但是我的要求是只有在收到PubSub Topic的通知后才从BigQuery读取数据。只有当收到以下消息时,上述DataFlow作业才应该开始从BigQuery读取数据。如果是不同的作业id或不同的状态,则不应该执行任何操作。

PubSub Message : {'job_id':101, 'status': 'Success'}

这部分有什么帮助吗?


共2个答案

匿名用户

这很简单,代码如下所示

pubsub_msg = (
   pipeline
   | beam.io.gcp.pubsub.ReadFromPubSub(topic=my_topic, subscription=my_subscription)
)

bigquery_data = (
    pubsub_msg
    | beam.Filter(lambda msg: msg['job_id']==101)   # you might want to use a more sophisticated filter condition
    | beam.io.ReadFromBigQuery(query='''
    SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
)
bigquery_data | beam.Map(print)

但是,如果您这样做,您将有一个流式数据流作业正在运行(无限期地,或者直到您取消该作业),因为使用ReadFromPubSub会自动生成一个流式作业。因此,当消息到达PubSub时,这不会启动一个数据流作业,而是一个作业已经在运行并侦听主题以进行某些操作。

如果您真的想触发Dataflow批处理作业,我建议使用Dataflow模板,并使用侦听PubSub主题的Cloud Function启动此模板。过滤的逻辑将在此CloudFunction中(作为简单的if条件)。

匿名用户

我最终使用了云函数,添加了过滤逻辑,并从那里启动数据流。发现下面的链接很有用。如何使用云函数触发数据流?(PythonSDK)