对于Pojo的/原语,Spark数据集从Row的Encoder
移动到Encoder
。Catalyst引擎使用ExpressionEncoder
转换SQL表达式中的列。然而,似乎没有其他Encoder
子类可用作我们自己实现的模板。
以下是在Spark 1. X/DataFrames中满意的代码示例,该代码在新制度中无法编译:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
我们得到一个编译器错误
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
所以不管怎样/某个地方应该有办法
DataFrame
(现在是Row
类型的数据集)上执行映射时应用它我正在寻找成功执行这些步骤的代码。
据我所知,自1.6以来没有什么真正改变,如何在数据集中存储自定义对象?中描述的解决方案是唯一可用的选项。然而,您当前的代码应该可以很好地与产品类型的默认编码器一起工作。
要了解为什么您的代码在1. x中工作而在2.0.0中可能不工作,您必须检查签名。在1.x中DataFrame.map
是一个采用函数Row=的方法
在2.0.0中DataFrame.map
接受类型为Row=的函数
df.rdd.map(row => ???)
对于数据集[行]
map
,请参阅尝试将数据框行映射到更新行时的编码器错误
你导入隐式编码器了吗?
导入_
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder
_其中火花是SparkSession,它解决了错误和自定义编码器得到了进口。
此外,编写自定义编码器是我从未尝试过的一种方法。
工作解决方案:-创建SparkSession并导入以下内容
导入_