提问者:小点点

在Apache Spark中以编程方式为数据帧生成架构和数据


我想动态生成一个包含报告标题记录的数据帧,因此从下面字符串的值创建一个数据帧:

val headerDescs : String = "Name,Age,Location"

val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

然而,现在我想对数据做同样的事情(实际上是相同的数据,即元数据)。

我创建一个RDD:

val headerRDD = sc.parallelize(headerDescs.split(","))

然后我打算使用createDataFrame来创建它:

val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)

然而,这失败了,因为createDataframe期望一个RDD[Row],但是我的RDD是一个字符串数组——我找不到将我的RDD转换为行RDD然后动态映射字段的方法。我见过的例子假设你事先知道列数,但是我希望最终能够在不更改代码的情况下更改列——例如,将列放在文件中。

基于第一个答案的代码摘录:

val headerDescs : String = "Name,Age,Location"

// create the schema from a string, splitting by delimiter
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

// create a row from a string, splitting by delimiter
val headerRDDRows = sc.parallelize(headerDescs.split(",")).map( a => Row(a))

val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)
headerDf.show()

执行此结果:

+--------+---+--------+

|    Name|Age|Location|

+--------+---+--------+

|    Name|

|     Age|

|Location|

+--------+---+-------

共1个答案

匿名用户

要将RDD[Array[String]]转换为RDD[Row],您需要执行以下步骤:

导入org. apache.park.sql.Row

val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))

scala> val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))

scala> val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:34

scala> val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
headerDf: org.apache.spark.sql.DataFrame = [Name: string, Age: string, Location: string]


scala> headerDf.printSchema
root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Location: string (nullable = true)



scala> headerDf.show
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
|Name|Age|Location|
+----+---+--------+

这会给你一个RDD[行]

用于读取文件

val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(a))
 
val headerDf = sqlContext.createDataFrame(vRDD , headerSchema)

使用Spark-CSV包:

 val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .schema(headerSchema) // defining based on the custom schema
    .load("cars.csv")

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("cars.csv")

还有各种选项,您可以在其留档中探索。