提问者:小点点

无法找到存储在数据集中的类型的编码器。在火花结构化流中


我正在尝试火花网站上给出的火花结构化流媒体示例,但它抛出错误

1.找不到存储在数据集中的类型的编码器。通过导入火花。隐含支持原始类型(Int,String等)和产品类型(大小写类)。_支持序列化其他类型将在未来的版本中添加。

2.方法没有足够的参数:(隐式证据2美元:org. apache.park.sql.Encoder[data])org.apache.park.sql.Dataset[data]。未指定值参数证据2美元。val ds:数据集[data]=df.as[data]

这是我的代码

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
          .builder()
          .appName("kafka-consumer")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

        spark.sparkContext.setLogLevel("WARN")

    case class data(name: String, id: String)

    val df = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "172.21.0.187:9093")
          .option("subscribe", "test")
          .load()
    println(df.isStreaming)

    val ds: Dataset[data] = df.as[data]
    val value = ds.select("name").where("id > 10")




    value.writeStream
          .outputMode("append")
          .format("console")
          .start()
          .awaitTermination()

  }
}

关于如何使这个工作的任何帮助。?我想要这样的最终输出我想要这样的输出

+-----+--------+
| name|    id
+-----+--------+
|Jacek|     1
+-----+--------+

共2个答案

匿名用户

错误的原因是您将Array[Byte]处理为来自Kafka并且没有字段可以匹配datacase class。

scala> println(schema.treeString)
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

将df.as[data]行更改为以下内容:

df.
  select($"value" cast "string").
  map(value => ...parse the value to get name and id here...).
  as[data]

我强烈建议使用select函数对象来处理传入的数据。

匿名用户

错误是由于数据框中的列数与您的案例类不匹配。

您在数据框中有[主题、时间戳、值、键、偏移量、时间戳类型、分区]

而你的案例课只有两栏

case class data(name: String, id: String)

您可以将数据框的内容显示为

val display = df.writeStream.format("console").start()

睡几秒钟然后

display.stop()

并且还使用选项("startingOffset","最早"),如此处所述

然后根据您的数据创建一个案例类。

希望这有帮助!