提问者:小点点

"云存储上的Parquet文件到Cloud BigTable"DataFlow模板无法读取parquet文件


我正在尝试将使用箭头库在R中写出的拼花文件移动到BigTable。我已经验证了箭头包的安装,并确保使用codec_is_available("snappy")可以使用snappy编解码器。

由于某种原因,在工作流程的第三步中,我遇到了以下错误:

Error message from worker: java.lang.RuntimeException: 
org.apache.beam.sdk.util.UserCodeException: 
org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 1 in file 
ReadableFile{
  metadata=Metadata{
    resourceId=gs://mybucket/userdata_2.parquet, 
    sizeBytes=85550, 
    isReadSeekEfficient=true, 
    checksum=null, 
    lastModifiedMillis=0}, compression=UNCOMPRESSED} 

我不清楚为什么它会出现这个错误,也不清楚为什么它说压缩=UNCOMPRESSED。该文件已使用snappy压缩。

我曾尝试将箭头版本从1.0更改为2.0,并尝试更改压缩编解码器,包括未压缩(即使Google Data Flow似乎不支持未压缩的格式)。错误保持不变。

使用parquet-tools之类的实用程序不会表明我上传的文件有任何问题。

我在这里错过了对Google数据流拼花格式的任何特殊要求吗?我已经遍历了箭头包中提供给我的那些,但无济于事。


共1个答案

匿名用户

我也看到这个错误时,试图使用我自己的pyarrow生成的parquets与parquet_to_bigtable数据流模板。

这个问题归结为模式不匹配。虽然parque中的数据与预期的格式完全匹配,并且打印nod-good和我自己的版本显示了完全相同的内容,但parquets包含描述模式的附加元数据,如下所示:

➜  ~ parq my_pyarrow_generated.parquet -s

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x12d7164c0>
required group field_id=-1 schema {
  optional binary field_id=-1 key;
  optional group field_id=-1 cells (List) {
    repeated group field_id=-1 list {
      optional group field_id=-1 item {
        optional binary field_id=-1 family (String);
        optional binary field_id=-1 qualifier;
        optional double field_id=-1 timestamp;
        optional binary field_id=-1 value;
      }
    }
  }
}

我知道这个模式可能并不是他们自己使用的,所以为了了解我离需要有多远,我使用逆模板bigtable_to_parquet来获取一个示例parquet文件,其中包含正确的元数据编码:

➜  ~ parq dataflow_bigtable_to_parquet.parquet -s                                                           

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x1205c6a80>
required group field_id=-1 com.google.cloud.teleport.bigtable.BigtableRow {
  required binary field_id=-1 key;
  required group field_id=-1 cells (List) {
    repeated group field_id=-1 array {
      required binary field_id=-1 family (String);
      required binary field_id=-1 qualifier;
      required int64 field_id=-1 timestamp;
      required binary field_id=-1 value;
    }
  }
}

如图所示,模式非常接近,但不精确。

不过,有了这个,我们可以构建一个简单的解决方法。这很恶心,但我现在仍在积极调试这个,这就是刚刚工作的最终结果。

bigtable_schema_parquet = pq.read_table(pa.BufferReader(bigtable_to_parquet_file_bytes))
keys = []
cells = []
.......
df = pd.DataFrame({'key': keys, 'cells': cells})
table = pa.Table.from_pandas(df, schema=bigtable_schema_parquet.schema)

tl; dr:使用bigtable_to_parquet数据流模板获取一个示例parquet,其中包含parquet_to_bigtable输入必须使用的模式。然后将该模式加载到内存中并将其传递给from_pandas以覆盖它原本推断的任何模式