我是Scala和Spark的新手。
我试图使用编码器从火花读取文件,然后转换为java/scala对象。
使用模式和编码读取文件的第一步可以正常工作。
然后我使用该数据集/数据框进行简单的映射操作,但如果我尝试在生成的数据集/数据框上打印模式,它不会打印任何列。
此外,当我第一次读取文件时,我没有在Person类中映射age字段,只是为了在map函数中计算它以尝试-但我根本没有看到年龄没有使用Person映射到数据帧。
Pern. txt中的数据:
firstName,lastName,dob
ABC, XYZ, 01/01/2019
CDE, FGH, 01/02/2020
以下是代码:
object EncoderExample extends App {
val sparkSession = SparkSession.builder().appName("EncoderExample").master("local").getOrCreate();
case class Person(firstName: String, lastName: String, dob: String,var age: Int = 10)
implicit val encoder = Encoders.bean[Person](classOf[Person])
val personDf = sparkSession.read.option("header","true").option("inferSchema","true").csv("Person.txt").as(encoder)
personDf.printSchema()
personDf.show()
val calAge = personDf.map(p => {
p.age = Year.now().getValue - p.dob.substring(6).toInt
println(p.age)
p
} )//.toDF()//.as(encoder)
print("*********Person DF Schema after age calculation: ")
calAge.printSchema()
//calAge.show
}
package spark
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.functions._
case class Person(firstName: String, lastName: String, dob: String, age: Long)
object CalcAge extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val sourceDF = Seq(
("ABC", "XYZ", "01/01/2019"),
("CDE", "FGH", "01/02/2020")
).toDF("firstName","lastName","dob")
sourceDF.printSchema
// root
// |-- firstName: string (nullable = true)
// |-- lastName: string (nullable = true)
// |-- dob: string (nullable = true)
sourceDF.show(false)
// +---------+--------+----------+
// |firstName|lastName|dob |
// +---------+--------+----------+
// |ABC |XYZ |01/01/2019|
// |CDE |FGH |01/02/2020|
// +---------+--------+----------+
def getCurrentYear: Long = {
val today:java.util.Date = Calendar.getInstance.getTime
val timeFormat = new SimpleDateFormat("yyyy")
timeFormat.format(today).toLong
}
val ageUDF = udf((d1: String) => {
val year = d1.split("/").reverse.head.toLong
val yearNow = getCurrentYear
yearNow - year
})
val df = sourceDF
.withColumn("age", ageUDF('dob))
df.printSchema
// root
// |-- firstName: string (nullable = true)
// |-- lastName: string (nullable = true)
// |-- dob: string (nullable = true)
// |-- age: long (nullable = false)
df.show(false)
// +---------+--------+----------+---+
// |firstName|lastName|dob |age|
// +---------+--------+----------+---+
// |ABC |XYZ |01/01/2019|1 |
// |CDE |FGH |01/02/2020|0 |
// +---------+--------+----------+---+
val person = df.as[Person].collectAsList()
// person: java.util.List[Person] = [Person(ABC,XYZ,01/01/2019,1), Person(CDE,FGH,01/02/2020,0)]
println(person)
}