我需要从另一个GCP项目的PubSub主题读取AVRO消息。我之前实现了Python数据流管道,它从PubSub读取JSON消息并写入BigQuery。但是我是处理AVRO消息的新手。我试图为AVRO查找Python留档,它指向这个链接https://avro.apache.org/docs/current/gettingstartedpython.html
在此链接中有从文件读取和写入文件的示例,但我不认为这些函数对从PubSub读取有用。我正在使用以下转换从PubSub读取,其中输出是字节字符串。
"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
我需要一种读取这些字节的方法(AVRO格式)
这是您可以使用的示例代码
from fastavro import parse_schema, schemaless_reader
messages = (p
| beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
.with_output_types(bytes))
class AvroReader:
def __init__(self, schema):
self.schema = schema
def deserialize(self, record):
bytes_reader = io.BytesIO(record)
dict_record = schemaless_reader(bytes_reader, self.schema)
return dict_record
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)
lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))
这些行应该有Avro形式的PCollection
。