我的基本要求是创建一个从BigQuery Table读取的管道,然后将其转换为JSON并将其传递到PubSub主题。
起初,我从Big Query中读取并尝试将其写入Pub Sub Topic,但收到异常错误,称批处理管道不支持“Pub Sub”。所以我尝试了一些解决方法和
我能够在python中解决这个问题
p = beam.Pipeline(options=options)
json_string_output = (
p
| 'Read from BQ' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM '\
'`project.dataset.table_name`',
use_standard_sql=True)
| 'convert to json' >> beam.Map(lambda record: json.dumps(record))
| 'Write results' >> beam.io.WriteToText(outputs_prefix)
)
p.run()
# create publisher
publisher = pubsub_v1.PublisherClient()
with open(input_file, 'rb') as ifp:
header = ifp.readline()
# loop over each record
for line in ifp:
event_data = line # entire line of input file is the message
print('Publishing {0} to {1}'.format(event_data, pubsub_topic))
publisher.publish(pubsub_topic, event_data)
Python工作代码仓库
我无法找到在单个ApacheBeam Pipeline中集成这两个脚本的方法。
因为您的管道没有任何无界PCollection,它将自动以批处理模式运行。您可以使用--流
命令行标志强制管道以流式模式运行。