提问者:小点点

使用Serializable Function进行大查询读取-如何从GenericRecords获取NUMERIC类型


嗨,

我正在使用Beam从BQ表中读取数据,并且发现使用SerializableFunction的read()比readTableRow()具有更好的性能。遵循https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html#read-org.apache.beam.sdk.transforms.SerializableFunction-
中的示例

我的Big Query列是:

|Field name | Field type|
|Date_Time  | TIMESTAMP |
|Simple_Id  | STRING    |
|A_Price    | NUMERIC   |

我的代码看起来像:

公共类ConvertBQSchemaRecordToModelDataFn实现SerializableFunction{

@Override
public ProtoValueType apply(SchemaAndRecord schemaAndRecord) {
    GenericRecord avroRecord = schemaAndRecord.getRecord();

    long dateTimeMillis  = (Long) avroRecord.get("Date_Time");
    String simpleId  = avroRecord.get("Simple_Id").toString();
    double aPrice  = convertToDouble(avroRecord.get("A_Price").toString());

long和String很好。但是,当我尝试转换NUMERIC类型时,GenericRecords(来自调试器)将其显示为您无法强制转换的HeapByteBuffer。我不确定如何获取“A_Price”的值:

调试

调用管道代码如下所示:

PCollection<ProtoValueType> protoData =
        pipeline.apply("BigQuery Read",
                       BigQueryIO.read(new ConvertBQSchemaRecordToProtoDataFn())
                               .fromQuery(sqlQuery)
                                .usingStandardSql()
                       .withCoder(ProtoCoder.of(ProtoValueType.class)));

我不确定是否使用了编码器。ModelValueType是一个原型生成的绑定类。

我的问题是:如何从GenericRecords(我认为是Avro对象)中获取NUMERIC类型的值?

感谢任何帮助。我可以使用readTableRow()获取行,它都以字符串形式返回,所以我不想理解该方法。


共1个答案

匿名用户

对应于NUMERIC字段的GenericRecords字段具有一些附加属性,您可以使用这些属性将NUMERIC解析为java. math.BigDecimal

这样一个字段的模式将是BYTES类型,如下所示:

{"type":"bytes","logicalType":"decimal","precision":38,"scale":9}

我刚刚发表了一篇博文,解释了如何在模式中使用这些属性将字节数组转换为java. math.BigDecimal

https://medium.com/@iht/reading-numeric-fields-with-bigqueryio-in-apache-beam-23273a9d0c99