当我尝试在我的代码中做下面提到的同样的事情时
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
我从这里获得了上面的参考:Scala:我如何使用scala替换数据帧中的值,但是我得到了编码器错误
无法为数据集中存储的类型找到编码器。通过导入spark.implicits支持原始类型(Int、S tring等)和产品类型(case类)。_将在未来版本中添加对序列化其他类型的支持。
注意:我正在使用火花2.0!
这里没有什么意外的。您正在尝试使用使用Spark 1. x编写且Spark 2.0不再支持的代码:
中DataFrame.map
是((行)♪T)(ClassTag[T])♪RDD[T]
Dataset[Row].map
is((Row)≠T)(Encoder[T])≠Dataset[T]
老实说,它在1. x中也没有多大意义。独立于版本,您可以简单地使用DataFrame
API:
import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
如果您真的想使用map
,您应该使用静态类型的Dataset
:
import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
case rec => rec
}
或者至少返回一个具有隐式编码器的对象:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == "tesla") "S" else make, model)
}
最后,如果出于某种完全疯狂的原因,您真的想映射Dataset[Row]
,您必须提供所需的编码器:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
StructField("year", IntegerType),
StructField("make", StringType),
StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == "tesla" =>
Row(year, "S", model)
case row => row
} (encoder)
对于预先知道dataframe schema的场景,@zero323给出的答案是解决方案
但对于具有动态模式/或将多个数据帧传递给通用函数的场景:以下代码在从1.6.1从2.2.0迁移时为我们工作
import org.apache.spark.sql.Row
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
val data = df.rdd.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
此代码在两个版本的park上执行。
缺点:不会被应用在dataframe/datasets api上。
只是为了更好地理解其他答案而添加一些其他重要的知识点(尤其是@zero323关于map
overDataset[Row]
的答案的最后一点):
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))
// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)