提问者:小点点

在数据流中从PubSub读取AVRO消息Python


我需要从另一个GCP项目的PubSub主题读取AVRO消息。我之前实现了Python数据流管道,它从PubSub读取JSON消息并写入BigQuery。但是我是处理AVRO消息的新手。我试图为AVRO查找Python留档,它指向这个链接https://avro.apache.org/docs/current/gettingstartedpython.html

在此链接中有从文件读取和写入文件的示例,但我不认为这些函数对从PubSub读取有用。我正在使用以下转换从PubSub读取,其中输出是字节字符串。

"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)

我需要一种读取这些字节的方法(AVRO格式)


共1个答案

匿名用户

这是您可以使用的示例代码

  1. 阅读来自Pub/Sub的消息
from fastavro import parse_schema, schemaless_reader

messages = (p
            | beam.io.ReadFromPubSub(
                subscription=known_args.input_subscription)
            .with_output_types(bytes))
class AvroReader:
    def __init__(self, schema):
        self.schema = schema

    def deserialize(self, record):
        bytes_reader = io.BytesIO(record)
        dict_record = schemaless_reader(bytes_reader, self.schema)
        return dict_record
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)

lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))

这些行应该有Avro形式的PCollection