提问者:小点点

为什么在创建自定义案例类的数据集时“无法找到存储在数据集中的类型的编码器”?


带有Scala 2.11.8的Spark 2.0(最终版本)。以下超级简单的代码产生编译错误错误:(17,45)无法找到存储在数据集中的类型的编码器。通过导入park. implits支持原始类型(Int、String等)和产品类型(case类)。_将在未来的版本中添加对序列化其他类型的支持。

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}

共3个答案

匿名用户

SparkDatasets需要Encoders来存储即将存储的数据类型。对于常见类型(原子、产品类型),有许多预定义的编码器可用,但您必须首先从SparkSession. implicits导入这些编码器才能使其工作:

val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)

或者,您可以直接提供显式

import org.apache.spark.sql.{Encoder, Encoders}

val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])

或隐含

implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)

存储类型的编码器

请注意,Encoders还为原子类型提供了许多预定义的Encoder,并且可以使用ExpressionEncoder派生复杂类型的Encoder

进一步阅读:

  • 对于内置编码器未涵盖的自定义对象,请参阅如何在数据集中存储自定义对象?
  • 对于Row对象,您必须显式提供Encoder,如Encoder error所示,同时尝试将数据帧行映射到更新的行
  • 对于调试用例,必须在Mainhttps://stackoverflow.com/a/34715827/3535853之外定义case类

匿名用户

对于其他用户(你的是正确的),请注意case类对象范围之外定义也很重要。所以:

失败:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()
    val dataset = sparkSession.createDataset(dataList)
  }
}

添加含义,仍然失败并出现相同的错误:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

作品:

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {   
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

以下是相关的bug:https://issues.apache.org/jira/browse/SPARK-13540,所以希望它能在Spark 2的下一个版本中得到修复。

(编辑:看起来那个错误修复实际上是在Spark 2.0.0中…所以我不确定为什么这仍然失败)。

匿名用户

我想澄清一下我自己的问题,如果目标是定义一个简单的文字SparkData框架,而不是使用Scala元组和隐式转换,更简单的方法是直接使用SparkAPI,如下所示:

  import org.apache.spark.sql._
  import org.apache.spark.sql.types._
  import scala.collection.JavaConverters._

  val simpleSchema = StructType(
    StructField("a", StringType) ::
    StructField("b", IntegerType) ::
    StructField("c", IntegerType) ::
    StructField("d", IntegerType) ::
    StructField("e", IntegerType) :: Nil)

  val data = List(
    Row("001", 1, 0, 3, 4),
    Row("001", 3, 4, 1, 7),
    Row("001", null, 0, 6, 4),
    Row("003", 1, 4, 5, 7),
    Row("003", 5, 4, null, 2),
    Row("003", 4, null, 9, 2),
    Row("003", 2, 3, 0, 1)
  )

  val df = spark.createDataFrame(data.asJava, simpleSchema)