我使用这里的代码
并将其更改为:
def getFullTableName(pn,tn):
return "{0}:{1}".format(pn,tn)
...
(
pipeline | "Read Data From Input Topic" >> beam.io.ReadFromPubSub(topic=data_topic)
| "Get Table data from input row" >> beam.Map(lambda r : data_ingestion.getData(r))
| "Write to BigQuery Table" >> beam.io.WriteToBigQuery(table = lambda project_name, dest_table_id : getFullTableName(project_name,dest_table_id), schema = lambda table, schema_coll : schema_coll[table], schema_side_inputs=(schema_coll,), create_disposition='CREATE_IF_NEEDED', write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
运行此命令时,我收到错误:
有人能帮我吗?
您的函数正在返回一个错误的表名,如您在此处看到的:
KeyError:"无:ticketing. test2
这意味着,变量project_name
是空的(无
)。您必须事先设置它,或者从您发送的数据中获取它。如您下面的示例所示,数据具有tablename
字段:
{"tablename":"data-analytics-bk:da_belgium_dataset.cust_data",…
我添加函数:def getFullTableName(self):logging.info('
并更改WriteToBigquery:… table=lambda l:data_ingestion.getFullTableName()
这解决了问题