我目前正在GCP数据流中构建PoC Apache Beam管道。在这种情况下,我想使用PubSub的主输入和BigQuery的侧输入创建流式管道,并将处理后的数据存储回BigQuery。
侧流水线代码
side_pipeline = (
p
| "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
| "map to read request" >>
beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
| beam.io.ReadAllFromBigQuery()
)
带侧输入代码的函数
def enrich_payload(payload, equipments):
id = payload["id"]
for equipment in equipments:
if id == equipment["id"]:
payload["type"] = equipment["type"]
payload["brand"] = equipment["brand"]
payload["year"] = equipment["year"]
break
return payload
主要流水线代码
main_pipeline = (
p
| "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
| "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
| "transform" >> beam.Map(transform_function)
| "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
src,
dt.datetime.fromisoformat(src["timestamp"]).timestamp()
))
| "windowing" >> beam.WindowInto(window.FixedWindows(30))
)
final_pipeline = (
main_pipeline
| "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
| "store" >> beam.io.WriteToBigQuery(bq_table)
)
result = p.run()
result.wait_until_finish()
将其部署到Dataflow后,一切看起来都很好,没有错误。但后来我注意到丰富数据
步骤有两个节点而不是一个节点。
您可以在此处找到完整的管道代码
我已经遵循这些文件中的所有说明:
但还是发现了这个错误。请帮帮我。谢谢!
这里有一个工作示例:
mytopic = ""
sql = "SELECT station_id, CURRENT_TIMESTAMP() timestamp FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations` LIMIT 10"
def to_bqrequest(e, sql):
from apache_beam.io import ReadFromBigQueryRequest
yield ReadFromBigQueryRequest(query=sql)
def merge(e, side):
for i in side:
yield f"Main {e.decode('utf-8')} Side {i}"
pubsub = p | "Read PubSub topic" >> ReadFromPubSub(topic=mytopic)
side_pcol = (p | PeriodicImpulse(fire_interval=300, apply_windowing=False)
| "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| "To BQ Request" >> ParDo(to_bqrequest, sql=sql)
| ReadAllFromBigQuery()
)
final = (pubsub | "Merge" >> ParDo(merge, side=beam.pvalue.AsList(side_pcol))
| Map(logging.info)
)
p.run()
请注意,这使用了GlobalWindow
(以便两个输入具有相同的窗口)。我使用了流转时长触发器,以便窗格包含多行。5
是任意选择的,使用1
也可以。
请注意,侧输入和主输入之间的数据匹配是不确定的,您可能会看到来自旧触发窗格的波动值。
理论上,使用FixedWindows
应该可以解决这个问题,但我无法让FixedWindows
正常工作。