我有以下类,run
从数据库表返回整数列表。
class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
}
}
以下代码
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
val processed = itemListJob.run(rc, priority).select("id").map(d => {
runJob.run(d) // d expected to be int
})
processed.saveAsTextFile("c:\\temp\\mpa")
得到的错误
[error] ...\src\main\scala\main.scala:39: type mismatch; [error] found : org.apache.spark.sql.Row [error] required: Int [error] runJob.run(d) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed
我试过了
val处理=itemListJob.run(rc,优先级). select("id").as[Int].map(d=
他们俩都犯了错误
找不到存储在数据集中的类型的编码器。通过导入火花.电码,支持原始类型(Int、String等)和产品类型(大小写类)。_将在未来的版本中添加对序列化其他类型的支持。
更新:我正在尝试添加导入隐含语句
>
import sc. int._有错误
值隐含不是org. apache.park.SparkContext的成员
import sqlContext. impl的。_
是可以的。但是,的后面的语句进行了处理。saveAsTextFile("c:\\temp\\mpa")
得到了错误
value saveAsTextFile不是org. apache.park.sql.Dataset[(Int,java.time.LocalDate)]的成员
您应该简单地将带有select("id")
的行更改为如下:
select("id").as[Int]
您应该导入将Row
转换为Ints的含义。
import sqlContext.implicits._ // <-- import implicits that add the "magic"
您还可以更改run
以包含以下转换(注意我添加的行的注释):
class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
import sqlContext.implicits._ // <-- import implicits that add the "magic"
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
.select("id") // <-- take only "id" (which Spark pushes down and hence makes your query faster
.as[Int] // <-- convert Row into Int
}
}
value saveAsTextFile不是org. apache.park.sql.Dataset[(Int,java.time.LocalDate)]的成员
编译错误是因为您尝试对不可用的Dataset
使用saveAsTextFile
操作。
在SparkSQL中写入是通过使用写运算符可用的DataFrameWriter:
write: DataFrameWriter[T]接口,用于将非流数据集的内容保存到外部存储中。
因此,您应该执行以下操作:
processed.write.text("c:\\temp\\mpa")
完成!