我正在尝试火花网站上给出的火花结构化流媒体示例,但它抛出错误
1.找不到存储在数据集中的类型的编码器。通过导入火花。隐含支持原始类型(Int,String等)和产品类型(大小写类)。_支持序列化其他类型将在未来的版本中添加。
2.方法没有足够的参数:(隐式证据2美元:org. apache.park.sql.Encoder[data])org.apache.park.sql.Dataset[data]。未指定值参数证据2美元。val ds:数据集[data]=df.as[data]
这是我的代码
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
case class data(name: String, id: String)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.load()
println(df.isStreaming)
val ds: Dataset[data] = df.as[data]
val value = ds.select("name").where("id > 10")
value.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
关于如何使这个工作的任何帮助。?我想要这样的最终输出我想要这样的输出
+-----+--------+
| name| id
+-----+--------+
|Jacek| 1
+-----+--------+
错误的原因是您将Array[Byte]
处理为来自Kafka并且没有字段可以匹配data
case class。
scala> println(schema.treeString)
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
将df.as[data]行更改为以下内容:
df.
select($"value" cast "string").
map(value => ...parse the value to get name and id here...).
as[data]
我强烈建议使用select
和函数
对象来处理传入的数据。
错误是由于数据框中的列数与您的案例类不匹配。
您在数据框中有[主题、时间戳、值、键、偏移量、时间戳类型、分区]
列
而你的案例课只有两栏
case class data(name: String, id: String)
您可以将数据框的内容显示为
val display = df.writeStream.format("console").start()
睡几秒钟然后
display.stop()
并且还使用选项("startingOffset","最早")
,如此处所述
然后根据您的数据创建一个案例类。
希望这有帮助!