提问者:小点点

RDD[数组[字符串]]到Dataframe


我是Spark和Hive的新手,我的目标是将一个分隔的(比如说csv)加载到Hive表。经过一番阅读,我发现将数据加载到Hive的路径是csv-

CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus

我使用以下命令读取csv文件:

val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39

现在我正在尝试将此RDD转换为Dataframe并使用以下代码:

scala> val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]

员工是一个案例类,我将其用作模式定义。

case class employee(eid: String, name: String, salary: String, destination: String)

然而,当我做df.show我得到以下错误:

org. apache.park.SparkException:由于阶段失败导致作业中止:阶段10.0中的任务0失败4次,最近一次失败:阶段10.0中丢失任务0.3(TID22,user.hostname):scala.MatchError:[Ljava.lang.String;@88ba3cb(属于[Ljava.lang.String;)

我期待一个数据帧作为输出。我知道为什么我会收到这个错误,因为RDD中的值存储在Ljava. lang.String;@88ba3cb格式中,我需要使用mkString来获取实际值,但我找不到如何做到这一点。感谢您的时间。


共3个答案

匿名用户

如果你修复了你的案例类,那么它应该可以工作:

scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee

scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24

scala> txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3)}.toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

否则,您可以将String转换为Int

scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee

scala> val df = txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3)}.toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]

scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

然而,最好的解决方案是使用spack-csv(这也会将工资视为Int)。

另请注意,当您运行df.show时引发了错误,因为在此之前所有内容都在延迟评估。df.show是一个将导致所有排队转换执行的操作(有关更多信息,请参阅本文)。

匿名用户

在数组元素上使用map,而不是在数组上:

val csv = sc.textFile("employee_data.txt")
    .map(line => line
                     .split(",")
                     .map(e => e.map(_.trim))
     )
val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()

但是,为什么要读取CSV然后将RDD转换为DF?Spark 1.5已经可以通过spack-csv包读取CSV:

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("inferSchema", "true") 
    .option("delimiter", ";") 
    .load("employee_data.txt")

匿名用户

正如您在评论中所说,您的案例类员工(应命名为员工)接收Int作为其构造函数的第一个参数,但您传递的是String。因此,您应该在实例化或修改将eid定义为String的案例之前将其转换为Int