我遇到了一个问题,使用py箭头编写一个结构到parket。根据数据集的大小,似乎会出现间歇性失败。如果我对数据集进行子采样或超采样,它有时会写入一个有效的数据集,有时不会。我无法辨别它的任何模式。
我正在写一个列,带有模式
struct<creation_date: string,
expiration_date: string,
last_updated: string,
name_server: string,
registrar: string,
status: string>
这似乎不是版本控制问题-写入有时会成功,并且我已经能够成功写入更复杂的数据类型,例如结构列表。
如果我取消结构的嵌套,这样每个属性都有自己的列,事情就会很好——这与结构的编写方式有关。
写入磁盘后,当我使用parquet-tools
检查时,我得到错误org.apache.parquet.io。ParquetDecodingException:无法读取文件
中块0中{n}处的值,其中n是引发问题的任何行。该特定行没有什么特别之处。
当我将表加载到hive中并尝试在那里探索它时,我得到了一些更具启发性的东西:
Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
at parquet.Preconditions.checkArgument(Preconditions.java:55)
at parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:82)
at parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:64)
at parquet.column.values.dictionary.DictionaryValuesReader.readValueDictionaryId(DictionaryValuesReader.java:76)
at parquet.column.impl.ColumnReaderImpl$1.read(ColumnReaderImpl.java:166)
at parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:464)
... 35 more
奇怪的是,其他数据类型看起来不错——这个特定的结构会引发错误。这是重现问题所需的代码:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import sys
# Command line argument to set how many rows in the dataset
_, n = sys.argv
n = int(n)
# Random whois data - should be a struct with the schema
# struct<creation_date: string,
# expiration_date: string,
# last_updated: string,
# name_server: string,
# registrar: string,
# status: string>
# nothing terribly interesting
df = pd.DataFrame({'whois':[
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T16:10:35', 'expiration_date': '2022-07-17T16:10:35', 'last_updated': None, 'name_server': 'ns59.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
{'registrar': 'Hongkong Domain Name Information Management Co., Limited', 'creation_date': '2020-07-17T10:28:36', 'expiration_date': '2021-07-17T10:28:36', 'last_updated': None, 'name_server': 'ns2.alidns.com\r', 'status': 'ok'},
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T04:04:06', 'expiration_date': '2021-07-17T04:04:06', 'last_updated': None, 'name_server': 'ns76.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
None
]})
# strangely, the bug only pops up for datasets of certain length
# When n is 2 or 5 it works fine, but 3 is busted.
df = pd.concat([df for _ in range(n)]).sample(frac=1)
print(df.tail())
table = pa.Table.from_pandas(df, preserve_index=False)
print(table)
# The write doesn't throw any errors
pq.write_table(table, '/tmp/tst2.pa')
# This read is the bit that throws the error - it's some random OSError
df = pd.read_parquet('/tmp/tst2.pa')
print(df)
p yarrow==0.17.1
python==3.6.10
熊猫=1.0.5
你的表模式有嵌套结构。它基本上是一个名为whois的列,包含用户定义的类型,其中包含字段creation_date、expiration_date等。
> table.schema
whois: struct<creation_date: string, expiration_date: string, last_updated: null, name_server: string, registrar: string, status: string>
child 0, creation_date: string
child 1, expiration_date: string
child 2, last_updated: null
child 3, name_server: string
child 4, registrar: string
child 5, status: string
在0.17.0
之前,不支持嵌套UDT(用户定义的类型)来读取和写入parque。但是这里已经解决了这个问题:https://issues.apache.org/jira/browse/ARROW-1644
如果您使用的是旧版本的箭头,考虑到您的数据框中只有一列,我建议不要使用UDT:
df = pd.DataFrame([
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T16:10:35', 'expiration_date': '2022-07-17T16:10:35', 'last_updated': None, 'name_server': 'ns59.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
{'registrar': 'Hongkong Domain Name Information Management Co., Limited', 'creation_date': '2020-07-17T10:28:36', 'expiration_date': '2021-07-17T10:28:36', 'last_updated': None, 'name_server': 'ns2.alidns.com\r', 'status': 'ok'},
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T04:04:06', 'expiration_date': '2021-07-17T04:04:06', 'last_updated': None, 'name_server': 'ns76.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
{}
])
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(table, '/tmp/tst2.pa')
df = pd.read_parquet('/tmp/tst2.pa')
另一种选择是直接在熊猫中压平你的桌子:
df = pd.DataFrame({'whois':[
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T16:10:35', 'expiration_date': '2022-07-17T16:10:35', 'last_updated': None, 'name_server': 'ns59.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
{'registrar': 'Hongkong Domain Name Information Management Co., Limited', 'creation_date': '2020-07-17T10:28:36', 'expiration_date': '2021-07-17T10:28:36', 'last_updated': None, 'name_server': 'ns2.alidns.com\r', 'status': 'ok'},
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T04:04:06', 'expiration_date': '2021-07-17T04:04:06', 'last_updated': None, 'name_server': 'ns76.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
None
]})
table = pa.Table.from_pandas(df, preserve_index=False).flatten()
df = pd.read_parquet('/tmp/tst2.pa')
df = pd.read_parquet('/tmp/tst2.pa')
作为附带说明,您可能需要提供自己的模式,因为熊猫和箭头试图猜测列的类型,但它们对空列做得不好(last_updated默认为浮点或空)
> table.schema
creation_date: string
expiration_date: string
last_updated: double
name_server: string
registrar: string
status: string
所以你可以做一些类似的事情:
df = pd.DataFrame([
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T16:10:35', 'expiration_date': '2022-07-17T16:10:35', 'last_updated': None, 'name_server': 'ns59.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
{'registrar': 'Hongkong Domain Name Information Management Co., Limited', 'creation_date': '2020-07-17T10:28:36', 'expiration_date': '2021-07-17T10:28:36', 'last_updated': None, 'name_server': 'ns2.alidns.com\r', 'status': 'ok'},
{'registrar': 'GoDaddy.com, LLC', 'creation_date': '2020-07-17T04:04:06', 'expiration_date': '2021-07-17T04:04:06', 'last_updated': None, 'name_server': 'ns76.domaincontrol.com\r', 'status': 'clientDeleteProhibited'},
{}
])
table_schema = pa.schema([
pa.field('creation_date', pa.string()),
pa.field('expiration_date', pa.string()),
pa.field('last_updated', pa.string()),
pa.field('name_server', pa.string()),
pa.field('registrar', pa.string()),
pa.field('status', pa.string()),
])
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(table, '/tmp/tst2.pa')
df = pd.read_parquet('/tmp/tst2.pa')