提问者:小点点

Apache Beam Cloud数据流流式传输卡住侧输入


我目前正在GCP数据流中构建PoC Apache Beam管道。在这种情况下,我想使用PubSub的主输入和BigQuery的侧输入创建流式管道,并将处理后的数据存储回BigQuery。

侧流水线代码

side_pipeline = (
    p
    | "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
    | "map to read request" >>
        beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
    | beam.io.ReadAllFromBigQuery()
)

带侧输入代码的函数

def enrich_payload(payload, equipments):
    id = payload["id"]
    for equipment in equipments:
        if id == equipment["id"]:
            payload["type"] = equipment["type"]
            payload["brand"] = equipment["brand"]
            payload["year"] = equipment["year"]

            break

    return payload

主要流水线代码

main_pipeline = (
    p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
    | "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
    | "transform" >> beam.Map(transform_function)
    | "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
        src,
        dt.datetime.fromisoformat(src["timestamp"]).timestamp()
    ))
    | "windowing" >> beam.WindowInto(window.FixedWindows(30))
)

final_pipeline = (
    main_pipeline
    | "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
    | "store" >> beam.io.WriteToBigQuery(bq_table)
)

result = p.run()
result.wait_until_finish()

将其部署到Dataflow后,一切看起来都很好,没有错误。但后来我注意到丰富数据步骤有两个节点而不是一个节点。

您可以在此处找到完整的管道代码

我已经遵循这些文件中的所有说明:

  • https://beam.apache.org/documentation/patterns/side-inputs/
  • https://beam.apache.org/releases/pydoc/2.35.0/apache_beam.io.gcp.bigquery.html

但还是发现了这个错误。请帮帮我。谢谢!


共1个答案

匿名用户

这里有一个工作示例:

mytopic = ""
sql = "SELECT station_id, CURRENT_TIMESTAMP() timestamp FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations` LIMIT 10"

def to_bqrequest(e, sql):
    from apache_beam.io import ReadFromBigQueryRequest
    yield ReadFromBigQueryRequest(query=sql)
     

def merge(e, side):
    for i in side:
        yield f"Main {e.decode('utf-8')} Side {i}"

pubsub = p | "Read PubSub topic" >> ReadFromPubSub(topic=mytopic)

side_pcol = (p | PeriodicImpulse(fire_interval=300, apply_windowing=False)
               | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
                                           trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
                                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
               | "To BQ Request" >> ParDo(to_bqrequest, sql=sql)
               | ReadAllFromBigQuery()
            )

final = (pubsub | "Merge" >> ParDo(merge, side=beam.pvalue.AsList(side_pcol))
                | Map(logging.info)
        )                    
    
p.run()

请注意,这使用了GlobalWindow(以便两个输入具有相同的窗口)。我使用了流转时长触发器,以便窗格包含多行。5是任意选择的,使用1也可以。

请注意,侧输入和主输入之间的数据匹配是不确定的,您可能会看到来自旧触发窗格的波动值。

理论上,使用FixedWindows应该可以解决这个问题,但我无法让FixedWindows正常工作。