提问者:小点点

从Dataflow python作业在bigquery中写入分区表


当我从数据流写入bigquery中的分区表时,我收到以下错误-有人能帮我吗?

无效的表ID\"test20181126美元\"。表ID必须是字母数字(加上下划线),并且最长必须为1024个字符。此外,不能使用表装饰器。

这是我用来写作的python片段

import apache_beam as beam


class bqwriter(beam.PTransform):
    def __init__(self, table, schema):
        super(BQWriter, self).__init__()
        self.table = table
        self.schema = schema

    def expand(self, pcoll):
        pcoll | beam.io.Write(beam.io.BigQuerySink(
            self.table,
            schema=self.schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        ))

我正在创建下面的标签

a | 'BQWrite' >> BQWriter("test-123:test.test$20181126", table_schema)

共2个答案

匿名用户

我也有同样的问题。我的解决方案是:

>

  • 要么在数据中添加一个日期列,然后设置要对其进行分区的BQ表

    或者在BQ中设置_PARTITIONTIME的默认分区。

    这两个选项都意味着您只能插入test-123:test. test

    至于我们是否应该能够做你想做的事情,似乎是的。光束JIRA说他们已经修复了Java,但是我找不到蟒蛇的状态。

  • 匿名用户

    最好的方法是将函数传递给本机beam.io。WriteToBigQuery类:

    def table_fn(element):
        current_date = date.fromtimestamp(element['timestamp']).strftime("%Y%m%d")
        return f"{bq_output_table}${current_date}"
    
    user_parent_user_watchever_pcol | "Write to BigQuery" >> 
    beam.io.Write(
        beam.io.WriteToBigQuery(
            table_fn,
            schema=schemas.VIDEOCATALOG_SCHEMA,
            method="STREAMING_INSERTS",
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        )
    )