提问者:小点点

使用数据流和Apache Beam(Python)将数据从Pub/Sub流式传输到BigQuery


目前,我在Dataflow上运行光束管道以将数据从Pub/Sub写入BigQuery时遇到了问题。我已经查看了各种步骤,所有数据本身似乎都按照预期进行了更改。问题来自使用beam.io. gcp.bigquery.WriteToBigQuery的步骤。检查堆栈驱动程序显示:

There were errors inserting to BigQuery: [<InsertErrorsValueListEntry errors: [<ErrorProto debugInfo: '' location: 'root.array[0].thing' message: 'no such field.' reason: 'invalid'>] index: 0>,

我缩短了上面的日志,但其余部分更多的是相同的;缺少字段。虽然这个错误是准确的,因为这些字段在架构中不存在,但我将additional_bq_parameters传递为:

{
  'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
  'ignoreUnknownValues': True,
  'maxBadRecords': 1000,
}

这些额外的参数似乎被忽略了,无论我使用一个可调用的只是返回上面的字典,或者只是设置additional_bq_parameters等于字典本身。

ParDo函数的过程如下所示:

    def process(self, tuple):
        import apache_beam as beam

        def get_additional_bq_parameters(_element):
            return {
                'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
                'ignoreUnknownValues': True,
                'maxBadRecords': 1000,
            }

        key, data = tuple
        table_name = f"table.{key}"
        (data | 'Upload' >> beam.io.gcp.bigquery.WriteToBigQuery(
            table=table_name,
            schema=self.schemas[key], # list of schemas passed to init
            additional_bq_parameters=get_additional_bq_parameters,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            method='STREAMING_INSERTS',
            insert_retry_strategy="RETRY_NEVER")
        )

在Dataflow上运行管道时,我传递以下参数:

python3 script.py \
  --project=<project> \
  --job_name=<name> \
  --streaming \
  --runner=DataflowRunner \
  --temp_location=<temp> \
  --experiments use_beam_bq_sink \
  --staging_location=<location> \
  --num_workers=2

如果有人能详细说明为什么BigQuery的附加论点似乎没有得到认可,我将不胜感激。

此外,我尝试将写入的结果返回到BigQuery,以尝试将失败的行持久化GCS。但是,当我尝试使用结果[beam.io. gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]结果['FailedRow']从结果PCollection访问失败的行时,我得到错误TypeError:'PCollection'对象不可订阅。根据我所看到的,我认为这是正确的方法。如果有人能澄清这一点,我将非常感激。


共2个答案

匿名用户

看来additional_bq_parameters选项有问题,还不支持所有选项。

我复制了你的案例,我无法使用这些参数执行管道;但是,如果我使用其中一些,它就可以工作

另一方面,要获取失败的行,您可以按照本文中给出的示例进行操作

events = (p
    | "Create data" >> beam.Create(data)
    | "CSV to dict" >> beam.ParDo(CsvToDictFn())
    | "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        "{0}:dataflow_test.good_lines".format(PROJECT),
        schema=schema,
    )
 )

访问FAILED_ROWS侧输出:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
    | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))

匿名用户

2.38.0版本开始,现在可以忽略目标表架构中不可用的字段。您需要将ignore_unknown_columns=True参数传递给WriteToBigQuery

扩展您的原始示例:

   (data | 'Upload' >> beam.io.gcp.bigquery.WriteToBigQuery(
            table=table_name,
            schema=self.schemas[key],
            ignore_unknown_columns=True, # PASS THIS PARAMETER TO IGNOR UNKNOWN FIELDS
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            method='STREAMING_INSERTS',
            insert_retry_strategy="RETRY_NEVER")
        )