提问者:小点点

PubSub到BigQuery-数据流/光束模板在Python?


Dataflow/Beam从PubSub读取并写入BigQuery是否有任何Python模板/脚本(现有或路线图)?根据GCP留档,只有一个Java模板。

谢谢!


共1个答案

匿名用户

您可以在此处找到带有模板的Pub/Sub到BigQuery示例示例:

Apache Beam流式传输管道示例。

它从Pub/Sub读取JSON编码的消息,转换消息数据,并将结果写入BigQuery。

这是另一个示例,展示了如何在Bigquery中将来自pubsub的无效消息处理到不同的表中:

class ParseMessage(beam.DoFn):
    OUTPUT_ERROR_TAG = 'error'
    
    def process(self, line):
        """
        Extracts fields from json message
        :param line: pubsub message
        :return: have two outputs:
            - main: parsed data
            - error: error message
        """
        try:
            parsed_row = _ # parse json message to corresponding bgiquery table schema
            yield data_row
        except Exception as error:
            error_row = _ # build you error schema here
            yield pvalue.TaggedOutput(self.OUTPUT_ERROR_TAG, error_row)
        

def run(options, input_subscription, output_table, output_error_table):
    """
    Build and run Pipeline
    :param options: pipeline options
    :param input_subscription: input PubSub subscription
    :param output_table: id of an output BigQuery table
    :param output_error_table: id of an output BigQuery table for error messages
    """

    with beam.Pipeline(options=options) as pipeline:
        # Read from PubSub
        rows, error_rows = \
            (pipeline | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
             # Adapt messages from PubSub to BQ table
             | 'Parse JSON messages' >> beam.ParDo(ParseMessage()).with_outputs(ParseMessage.OUTPUT_ERROR_TAG,
                                                                                main='rows')
             )

        _ = (rows | 'Write to BigQuery'
             >> beam.io.WriteToBigQuery(output_table,
                                        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                        insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                        )
             )

        _ = (error_rows | 'Write errors to BigQuery'
             >> beam.io.WriteToBigQuery(output_error_table,
                                        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                        insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                        )
             )


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_subscription', required=True,
        help='Input PubSub subscription of the form "/subscriptions/<PROJECT>/<SUBSCRIPTION>".')
    parser.add_argument(
        '--output_table', required=True,
        help='Output BigQuery table for results specified as: PROJECT:DATASET.TABLE or DATASET.TABLE.')
    parser.add_argument(
        '--output_error_table', required=True,
        help='Output BigQuery table for errors specified as: PROJECT:DATASET.TABLE or DATASET.TABLE.')
    known_args, pipeline_args = parser.parse_known_args()
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    run(pipeline_options, known_args.input_subscription, known_args.output_table, known_args.output_error_table)