提问者:小点点

在从数据流插入BigQuery之前验证行


根据从数据流加载Bigquery表时我们如何设置maximum_bad_records?目前没有办法在从数据流加载数据到BigQuery时设置maxBadRecords配置。建议是在将Dataflow作业中的行插入BigQuery之前验证它们。

如果我有TableSchemaTableRow,我如何确保该行可以安全地插入到表中?

一定有比遍历模式中的字段,查看它们的类型并查看行中值的类更简单的方法,对吗?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道都会失败。

更新:

我的用例是一个ETL作业,一开始它将运行在云存储上的JSON(每行一个对象)日志上,并批量写入BigQuery,但后来会从PubSub读取对象并连续写入BigQuery。这些对象包含许多在BigQuery中不必要的信息,还包含甚至不可能在模式中描述的部分(基本上是自由形式JSON有效负载)。像时间戳这样的东西也需要格式化以与BigQuery一起使用。这个作业会有一些变体在不同的输入上运行并写入不同的表。

从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery。我或多或少只是遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,必要时应用格式化(这可能是降级,将毫秒时间戳除以1000,从URL中提取主机名等),并将值写入TableRow对象。

我的问题是数据很乱。有几亿个对象,有些看起来不像预期的那样,这很少见,但是有了这些卷,仍然会发生罕见的事情。有时应该包含字符串的属性包含一个整数,反之亦然。有时应该有字符串的数组或对象。

理想情况下,我想获取我的TableRow并将其传递给TableSchema并询问“这可以吗?”。

由于这是不可能的,我所做的是查看TableSchema对象并尝试自己验证/转换值。如果TableSchema说属性是STRING类型,我在将其添加到TableRow之前运行value. toString()。如果它是INTEGER,我检查它是IntegerLongBigInteger等等。这种方法的问题是我只是猜测BigQuery中的工作方式。FLOAT将接受哪些Java数据类型?对于TIMESTAMP?我认为我的验证/转换捕获了大多数问题,但总是有异常和边缘情况。

根据我非常有限的经验,如果单行未能通过BigQuery的验证(就像常规加载一样,除非maxBadRecords设置为足够大的数字),整个工作管道(作业?工作流?不确定正确的术语)都会失败。它也会因表面上有帮助的消息而失败,例如“BigQuery导入作业dataflow_job_xxx”失败。原因:(5db0b2cdab1557e0):项目"xxx"中的BigQuery作业"dataflow_job_xxx"以错误结束: errorResult:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射。也许在某个地方可以看到更详细的错误消息,可以告诉我它是哪个属性以及值是什么?如果没有这些信息,它还不如说“坏数据”。

据我所知,至少在批处理模式下运行时,Dataflow会将TableRow对象写入Cloud Storage中的暂存区,然后一旦一切就绪,就开始加载。这意味着我无处捕获任何错误,当BigQuery加载时,我的代码不再运行。我还没有在流式模式下运行任何作业,但我不确定那里会有什么不同,从我(公认有限)的理解基本原理是一样的,只是批处理大小更小。

人们使用Dataflow和BigQuery,所以不可能不担心整个管道因为一个错误的输入而停止工作。人们是如何做到的?


共1个答案

匿名用户

我假设您将文件中的JSON反序列化为Map

我建议使用迭代方法来开发模式验证,包括以下两个步骤。

>

  • 写一个PTransform

    我们这里的代码做了一些类似的事情——给定一个由BigQuery生成并写成JSONGCS的文件,我们递归地遍历模式并进行一些类型转换。然而,我们不需要验证,因为BigQuery本身编写了数据。

    请注意,TableSchema对象不是Serializable。我们通过将DoFnPTransform构造函数中的TableSchema转换为JSONString并返回来解决这个问题。请参阅BigQueryIO.java中使用jsonTableSchema变量的代码。

    使用这篇博文中描述的“死信”策略来处理不良记录--侧输出有问题的Map

    您可以从一些小文件开始,使用DirectPipelineRunner而不是DataflowPipelineRunner。直接运行器在您的计算机上运行管道,而不是在Google Cloud Dataflow服务上运行管道,并且它使用BigQuery流式写入。我相信当这些写入失败时,您会收到更好的错误消息。

    (我们使用GCS-

    最后,在日志信息方面:

    • 请务必检查Cloud Logging(通过点击日志面板上的Worker Logs链接。
    • 如果您运行bq命令行实用程序:bq show-j PROJECT:dataflow_job_XXXXXXX,您可能会获得有关批处理数据流触发的加载作业失败原因的更好信息。