目前,我在Dataflow上运行光束管道以将数据从Pub/Sub写入BigQuery时遇到了问题。我已经查看了各种步骤,所有数据本身似乎都按照预期进行了更改。问题来自使用beam.io. gcp.bigquery.WriteToBigQuery的步骤。检查堆栈驱动程序显示:
There were errors inserting to BigQuery: [<InsertErrorsValueListEntry errors: [<ErrorProto debugInfo: '' location: 'root.array[0].thing' message: 'no such field.' reason: 'invalid'>] index: 0>,
我缩短了上面的日志,但其余部分更多的是相同的;缺少字段。虽然这个错误是准确的,因为这些字段在架构中不存在,但我将additional_bq_parameters传递为:
{
'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
'ignoreUnknownValues': True,
'maxBadRecords': 1000,
}
这些额外的参数似乎被忽略了,无论我使用一个可调用的只是返回上面的字典,或者只是设置additional_bq_parameters等于字典本身。
ParDo函数的过程如下所示:
def process(self, tuple):
import apache_beam as beam
def get_additional_bq_parameters(_element):
return {
'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
'ignoreUnknownValues': True,
'maxBadRecords': 1000,
}
key, data = tuple
table_name = f"table.{key}"
(data | 'Upload' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=table_name,
schema=self.schemas[key], # list of schemas passed to init
additional_bq_parameters=get_additional_bq_parameters,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='STREAMING_INSERTS',
insert_retry_strategy="RETRY_NEVER")
)
在Dataflow上运行管道时,我传递以下参数:
python3 script.py \
--project=<project> \
--job_name=<name> \
--streaming \
--runner=DataflowRunner \
--temp_location=<temp> \
--experiments use_beam_bq_sink \
--staging_location=<location> \
--num_workers=2
如果有人能详细说明为什么BigQuery的附加论点似乎没有得到认可,我将不胜感激。
此外,我尝试将写入的结果返回到BigQuery,以尝试将失败的行持久化GCS。但是,当我尝试使用结果[beam.io. gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
或结果['FailedRow']
从结果PCollection访问失败的行时,我得到错误TypeError:'PCollection'对象不可订阅
。根据我所看到的,我认为这是正确的方法。如果有人能澄清这一点,我将非常感激。
看来additional_bq_parameters选项有问题,还不支持所有选项。
我复制了你的案例,我无法使用这些参数执行管道;但是,如果我使用其中一些,它就可以工作
另一方面,要获取失败的行,您可以按照本文中给出的示例进行操作
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
访问FAILED_ROWS侧输出:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
从2.38.0
版本开始,现在可以忽略目标表架构中不可用的字段。您需要将ignore_unknown_columns=True
参数传递给WriteToBigQuery
。
扩展您的原始示例:
(data | 'Upload' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=table_name,
schema=self.schemas[key],
ignore_unknown_columns=True, # PASS THIS PARAMETER TO IGNOR UNKNOWN FIELDS
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='STREAMING_INSERTS',
insert_retry_strategy="RETRY_NEVER")
)