我是Spark和Hive的新手,我的目标是将一个分隔的(比如说csv)加载到Hive表。经过一番阅读,我发现将数据加载到Hive的路径是csv-
CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus
我使用以下命令读取csv文件:
val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39
现在我正在尝试将此RDD转换为Dataframe并使用以下代码:
scala> val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]
员工是一个案例类,我将其用作模式定义。
case class employee(eid: String, name: String, salary: String, destination: String)
然而,当我做df.show
我得到以下错误:
org. apache.park.SparkException:由于阶段失败导致作业中止:阶段10.0中的任务0失败4次,最近一次失败:阶段10.0中丢失任务0.3(TID22,user.hostname):scala.MatchError:[Ljava.lang.String;@88ba3cb(属于[Ljava.lang.String;)
我期待一个数据帧作为输出。我知道为什么我会收到这个错误,因为RDD中的值存储在Ljava. lang.String;@88ba3cb
格式中,我需要使用mkString
来获取实际值,但我找不到如何做到这一点。感谢您的时间。
如果你修复了你的案例类,那么它应该可以工作:
scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee
scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24
scala> txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3)}.toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
否则,您可以将String
转换为Int
:
scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee
scala> val df = txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3)}.toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]
scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
然而,最好的解决方案是使用spack-csv
(这也会将工资视为Int
)。
另请注意,当您运行df.show
时引发了错误,因为在此之前所有内容都在延迟评估。df.show是一个将导致所有排队转换执行的操作(有关更多信息,请参阅本文)。
在数组元素上使用map,而不是在数组上:
val csv = sc.textFile("employee_data.txt")
.map(line => line
.split(",")
.map(e => e.map(_.trim))
)
val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
但是,为什么要读取CSV然后将RDD转换为DF?Spark 1.5已经可以通过spack-csv
包读取CSV:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.load("employee_data.txt")
正如您在评论中所说,您的案例类员工(应命名为员工
)接收Int
作为其构造函数的第一个参数,但您传递的是String
。因此,您应该在实例化或修改将eid
定义为String
的案例之前将其转换为Int
。