提问者:小点点

在Map[String,java.ioSerializable]中找不到java.io的编码器


我正在编写一个火花作业,数据集非常灵活,它被定义为数据集[Map[String,java.io. Serializable]]

现在问题开始出现了,Spark Runtime抱怨没有为java.io找到编码器。Serializable。我试过kyro serde,仍然显示相同的错误消息。

我必须使用这种奇怪的数据集类型的原因是因为我每行都有灵活的字段。地图如下所示:

Map(
  "a" -> 1,
  "b" -> "bbb",
  "c" -> 0.1,
  ...
)

Spark中是否有处理这种灵活数据集类型的方法?

编辑:这是任何人都可以尝试的可靠代码。

import org.apache.spark.sql.{Dataset, SparkSession}

object SerdeTest extends App {
  val sparkSession: SparkSession = SparkSession
    .builder()
    .master("local[2]")
    .getOrCreate()


  import sparkSession.implicits._
  val ret: Dataset[Record] = sparkSession.sparkContext.parallelize(0 to 10)
    .map(
      t => {
        val row = (0 to t).map(
          i => i -> i.asInstanceOf[Integer]
        ).toMap

        Record(map = row)
      }
    ).toDS()

  val repartitioned = ret.repartition(10)


  repartitioned.collect.foreach(println)
}

case class Record (
                  map: Map[Int, java.io.Serializable]
                  )

上面的代码会给你错误找不到编码器:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.io.Serializable
- map value class: "java.io.Serializable"
- field (class: "scala.collection.immutable.Map", name: "map")

共1个答案

匿名用户

找到了答案,解决这个问题的一种方法是使用Kyro serde框架,代码更改非常少,只需要使用Kyro制作一个隐式编码器,并在需要序列化时将其带入上下文。

这是我得到的代码示例(可以直接在IntelliJ或等效IDE中运行):

import org.apache.spark.sql._

object SerdeTest extends App {
  val sparkSession: SparkSession = SparkSession
    .builder()
    .master("local[2]")
    .getOrCreate()


  import sparkSession.implicits._

  // here is the place you define your Encoder for your custom object type, like in this case Map[Int, java.io.Serializable]
  implicit val myObjEncoder: Encoder[Record] = org.apache.spark.sql.Encoders.kryo[Record]
  val ret: Dataset[Record] = sparkSession.sparkContext.parallelize(0 to 10)
    .map(
      t => {
        val row = (0 to t).map(
          i => i -> i.asInstanceOf[Integer]
        ).toMap

        Record(map = row)
      }
    ).toDS()

  val repartitioned = ret.repartition(10)


  repartitioned.collect.foreach(
    row => println(row.map)
  )
}

case class Record (
                  map: Map[Int, java.io.Serializable]
                  )

此代码将产生预期的结果:

Map(0 -> 0, 5 -> 5, 1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
Map(0 -> 0, 1 -> 1, 2 -> 2)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 2 -> 2, 7 -> 7, 3 -> 3, 4 -> 4)
Map(0 -> 0, 1 -> 1)
Map(0 -> 0, 1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
Map(0 -> 0, 1 -> 1, 2 -> 2, 3 -> 3)
Map(0 -> 0)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 2 -> 2, 3 -> 3, 4 -> 4)
Map(0 -> 0, 5 -> 5, 10 -> 10, 1 -> 1, 6 -> 6, 9 -> 9, 2 -> 2, 7 -> 7, 3 -> 3, 8 -> 8, 4 -> 4)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 9 -> 9, 2 -> 2, 7 -> 7, 3 -> 3, 8 -> 8, 4 -> 4)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 2 -> 2, 7 -> 7, 3 -> 3, 8 -> 8, 4 -> 4)