我正在尝试将使用箭头
库在R中写出的拼花文件移动到BigTable。我已经验证了箭头包的安装,并确保使用codec_is_available("snappy")
可以使用snappy编解码器。
由于某种原因,在工作流程的第三步中,我遇到了以下错误:
Error message from worker: java.lang.RuntimeException:
org.apache.beam.sdk.util.UserCodeException:
org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 1 in file
ReadableFile{
metadata=Metadata{
resourceId=gs://mybucket/userdata_2.parquet,
sizeBytes=85550,
isReadSeekEfficient=true,
checksum=null,
lastModifiedMillis=0}, compression=UNCOMPRESSED}
我不清楚为什么它会出现这个错误,也不清楚为什么它说压缩=UNCOMPRESSED
。该文件已使用snappy
压缩。
我曾尝试将箭头版本从1.0更改为2.0,并尝试更改压缩编解码器,包括未压缩(即使Google Data Flow似乎不支持未压缩的格式)。错误保持不变。
使用parquet-tools之类的实用程序不会表明我上传的文件有任何问题。
我在这里错过了对Google数据流拼花格式的任何特殊要求吗?我已经遍历了箭头包中提供给我的那些,但无济于事。
我也看到这个错误时,试图使用我自己的pyarrow生成的parquets与parquet_to_bigtable
数据流模板。
这个问题归结为模式不匹配。虽然parque中的数据与预期的格式完全匹配,并且打印nod-good和我自己的版本显示了完全相同的内容,但parquets包含描述模式的附加元数据,如下所示:
➜ ~ parq my_pyarrow_generated.parquet -s
# Schema
<pyarrow._parquet.ParquetSchema object at 0x12d7164c0>
required group field_id=-1 schema {
optional binary field_id=-1 key;
optional group field_id=-1 cells (List) {
repeated group field_id=-1 list {
optional group field_id=-1 item {
optional binary field_id=-1 family (String);
optional binary field_id=-1 qualifier;
optional double field_id=-1 timestamp;
optional binary field_id=-1 value;
}
}
}
}
我知道这个模式可能并不是他们自己使用的,所以为了了解我离需要有多远,我使用逆模板bigtable_to_parquet
来获取一个示例parquet文件,其中包含正确的元数据编码:
➜ ~ parq dataflow_bigtable_to_parquet.parquet -s
# Schema
<pyarrow._parquet.ParquetSchema object at 0x1205c6a80>
required group field_id=-1 com.google.cloud.teleport.bigtable.BigtableRow {
required binary field_id=-1 key;
required group field_id=-1 cells (List) {
repeated group field_id=-1 array {
required binary field_id=-1 family (String);
required binary field_id=-1 qualifier;
required int64 field_id=-1 timestamp;
required binary field_id=-1 value;
}
}
}
如图所示,模式非常接近,但不精确。
不过,有了这个,我们可以构建一个简单的解决方法。这很恶心,但我现在仍在积极调试这个,这就是刚刚工作的最终结果。
bigtable_schema_parquet = pq.read_table(pa.BufferReader(bigtable_to_parquet_file_bytes))
keys = []
cells = []
.......
df = pd.DataFrame({'key': keys, 'cells': cells})
table = pa.Table.from_pandas(df, schema=bigtable_schema_parquet.schema)
tl; dr:使用bigtable_to_parquet
数据流模板获取一个示例parquet,其中包含parquet_to_bigtable
输入必须使用的模式。然后将该模式加载到内存中并将其传递给from_pandas
以覆盖它原本推断的任何模式