我创建了如下的数据框:
+----+-------+-------+
| age| number|name |
+----+-------+-------+
| 16| 12|A |
| 16| 13|B |
| 17| 16|E |
| 17| 17|F |
+----+-------+-------+
如何将其转换为以下json:
{
'age' : 16,
'values' : [{‘number’: ‘12’ , ‘name’ : 'A'},{‘number’: ‘12’ , ‘name’ : 'A'} ]
},{
'age' : 17,
'values' : [{‘number’: ‘16’ , ‘name’ : 'E'},{‘number’: ‘17’ , ‘name’ : 'F'} ]
}
假设df
是您的数据帧,
from pyspark.sql import functions as F
new_df = df.select(
"age",
F.struct(
F.col("number"),
F.col("name"),
).alias("values")
).groupBy(
"age"
).agg(
F.collect_list("values").alias("values")
)
new_df.toJSON()
# or
new_df.write.json(...)
您可以将DF转换为RDD并应用您的转换:
NewSchema = StructType([StructField("age", IntegerType())
, StructField("values", StringType())
])
res_df = df.rdd.map(lambda row: (row[0], ([{'number':row[1], 'name':row[2]}])))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda row: (row[0], json.dumps(row[1])))\
.toDF(NewSchema)
res_df.show(20, False)
显示res_df:
+---+------------------------------------------------------------+
|age|values |
+---+------------------------------------------------------------+
|16 |[{"number": 12, "name": "A"}, [{"number": 13, "name": "B"}] |
|17 |[{"number": 17, "name": "F"}, [{"number": 16, "name": "E"}] |
+---+------------------------------------------------------------+
将DF另存为JSON文件:
res_df.coalesce(1).write.format('json').save('output.json')