我正在尝试编写这样的DataframeParquet:
| foo | bar |
|-----|-------------------|
| 1 | {"a": 1, "b": 10} |
| 2 | {"a": 2, "b": 20} |
| 3 | {"a": 3, "b": 30} |
我正在和熊猫和Fastparque一起做:
df = pd.DataFrame({
"foo": [1, 2, 3],
"bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})
import fastparquet
fastparquet.write('/my/parquet/location/toy-fastparquet.parq', df)
我想在(py)Spark中加载Parquet,并使用SparkSQL查询数据,例如:
df = spark.read.parquet("/my/parquet/location/")
df.registerTempTable('my_toy_table')
result = spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 15")
我的问题是,即使fastparque
可以正确读取其Parquet文件(bar
字段被正确反序列化为结构),在Spark中,bar
被读取为String类型的列,它只包含原始结构的JSON表示:
In [2]: df.head()
Out[2]: Row(foo=1, bar='{"a": 1, "b": 10}')
我尝试用PyArrow编写Parquet,但没有成功:ArrowNot实现错误:不支持结构的级别生成
。我也尝试过将file_scheme='hive'传递给Fastparque,但我得到了相同的结果。将Fastparque序列化更改为BSON(object_encoding='bson'
)会产生一个不可读的二进制字段。
[编辑]我看到以下方法:
您在这里至少有3个选项:
备选方案1:
您不需要使用任何额外的库,例如fastparquet
,因为Spark已经提供了该功能:
pdf = pd.DataFrame({
"foo": [1, 2, 3],
"bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})
df = spark.createDataFrame(pdf)
df.write.mode("overwrite").parquet("/tmp/parquet1")
如果尝试使用df=spark.read. parque("/tmp/parquet1")
加载数据,则模式将是:
StructType([
StructField("foo", LongType(), True),
StructField("bar",MapType(StringType(), LongType(), True), True)])
正如您在这种情况下看到的,Spark将保留正确的模式。
备选方案2:
如果出于任何原因仍然需要使用fastparque
,那么bar
将被视为字符串,因此您可以将bar
作为字符串加载,然后使用from_json函数将其转换为JSON。在您的情况下,我们将把json作为Map(string, int)的字典来处理。这是为了我们自己的方便,因为数据似乎是一个可以由字典完美表示的键/值序列:
from pyspark.sql.types import StringType, MapType,LongType
from pyspark.sql.functions import from_json
df = spark.read.parquet("/tmp/parquet1")
# schema should be a Map(string, string)
df.withColumn("bar", from_json("bar", MapType(StringType(), LongType()))).show()
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 1|[a -> 1, b -> 10]|
# | 2|[a -> 2, b -> 20]|
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+
备选方案3:
如果您的模式没有改变并且您知道bar的每个值将始终具有相同的字段组合(a, b),您还可以将bar
转换为结构:
schema = StructType([
StructField("a", LongType(), True),
StructField("b", LongType(), True)
])
df = df.withColumn("bar", from_json("bar", schema))
df.printSchema()
# root
# |-- foo: long (nullable = true)
# |-- bar: struct (nullable = true)
# | |-- a: long (nullable = true)
# | |-- b: long (nullable = true)
示例:
然后您可以使用以下命令运行代码:
df.registerTempTable('my_toy_table')
spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 20").show()
# or spark.sql("SELECT * FROM my_toy_table WHERE bar['b'] > 20")
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+