你好,我正在处理一个问题与scala/火花项目试图做一些计算我的scala代码在火花外壳上工作得很好,但当尝试运行相同的代码与sbt程序集转换scala.jar文件,我面临这个错误:
找不到AccessLog类型的编码器。需要一个隐式编码器[AccessLog]来将AccessLog实例存储在数据集中。通过导入park. int,支持原始类型(Int,String等)和产品类型(case类)。_将在未来的版本中添加对序列化其他类型的支持。
我正在尝试将Dataset[List[String]]转换为Dataset[AccessLog]AccessLog是一个案例类,通过使用映射它。
错误截图
生成错误的代码:
import org.apache.spark.sql.{ Dataset, Encoder, SparkSession }
import org.apache.spark.sql.functions._
object DstiJob {
// try and catch
def run(spark: SparkSession, inputPath: String, outputPath: String): String = {
// import spark.sqlContext.implicits._
import spark.implicits._
import org.apache.spark.sql.{ Encoder, Encoders }
// implicit val enc: Encoder[AccessLog] = Encoders.product[AccessLog]
val inputPath = "access.log.gz"
val outputPath = "data/reports"
val logsAsString = spark.read.text(inputPath).as[String]
case class AccessLog(ip: String, ident: String, user: String, datetime: String, request: String, status: String, size: String, referer: String, userAgent: String, unk: String)
val R = """^(?<ip>[0-9.]+) (?<identd>[^ ]) (?<user>[^ ]) \[(?<datetime>[^\]]+)\] \"(?<request>[^\"]*)\" (?<status>[^ ]*) (?<size>[^ ]*) \"(?<referer>[^\"]*)\" \"(?<useragent>[^\"]*)\" \"(?<unk>[^\"]*)\""".r
val dsParsed = logsAsString.flatMap(x => R.unapplySeq(x))
def toAccessLog(params: List[String]) = AccessLog(params(0), params(1), params(2), params(3), params(5), params(5), params(6), params(7), params(8), params(9))
val ds: Dataset[AccessLog] = dsParsed.map(toAccessLog _)
val dsWithTime = ds.withColumn("datetime", to_timestamp(ds("datetime"), "dd/MMM/yyyy:HH:mm:ss X"))
dsWithTime.cache
dsWithTime.createOrReplaceTempView("AccessLog")
为了解决编译错误,应该在方法run
之外定义case类。
而不是
object DstiJob {
def run(spark: SparkSession, ...) {
[...]
case class AccessLog(...)
val ds: Dataset[AccessLog] = ...
[...]
}
}
您可以使用
object DstiJob {
case class AccessLog(...)
def run(spark: SparkSession, ...) {
[...]
val ds: Dataset[AccessLog] = ...
[...]
}
}
这应该可以解决问题,但不幸的是,我无法解释为什么这有帮助。