提问者:小点点

数据流中写入bigquery中多个表的问题


我使用这里的代码

并将其更改为:

def getFullTableName(pn,tn):
    return "{0}:{1}".format(pn,tn)
...
(
  pipeline | "Read Data From Input Topic" >> beam.io.ReadFromPubSub(topic=data_topic)
           | "Get Table data from input row" >> beam.Map(lambda r : data_ingestion.getData(r))
           | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(table = lambda project_name, dest_table_id : getFullTableName(project_name,dest_table_id),                                                                            schema = lambda table, schema_coll : schema_coll[table],                                                                            schema_side_inputs=(schema_coll,),                                                                             create_disposition='CREATE_IF_NEEDED',                                                                           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
        )

运行此命令时,我收到错误:

有人能帮我吗?


共2个答案

匿名用户

您的函数正在返回一个错误的表名,如您在此处看到的:

KeyError:"无:ticketing. test2

这意味着,变量project_name是空的()。您必须事先设置它,或者从您发送的数据中获取它。如您下面的示例所示,数据具有tablename字段:

{"tablename":"data-analytics-bk:da_belgium_dataset.cust_data",…

匿名用户

我添加函数:def getFullTableName(self):logging.info('

并更改WriteToBigquery:… table=lambda l:data_ingestion.getFullTableName()

这解决了问题