我想创建一个应该用case类T实现的Scala trait。该trait只是加载数据并将其转换为T类型的Spark Dataset。我得到的错误是没有编码器可以存储,我认为这是因为Scala不知道T应该是case类。我怎么能告诉编译器呢?我在某个地方看到我应该提到产品,但没有定义这样的类。.随意建议其他方法来做到这一点!
我有以下代码,但它没有编译错误: 42:错误:无法找到存储在数据集中的类型的编码器。通过导入sqlContext.int,支持原始类型(Int,String等)和产品类型(case类)。_[INFO].as[T]
我正在使用Spark 1.6.1
代码:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SQLContext}
/**
* A trait that moves data on Hadoop with Spark based on the location and the granularity of the data.
*/
trait Agent[T] {
/**
* Load a Dataframe from the location and convert into a Dataset
* @return Dataset[T]
*/
protected def load(): Dataset[T] = {
// Read in the data
SparkContextKeeper.sqlContext.read
.format("com.databricks.spark.csv")
.load("/myfolder/" + location + "/2016/10/01/")
.as[T]
}
}
您的代码缺少3件事:
Products
的子类(所有Scala案例类和元组的超类)TypeTag
和ClassTag
。Spark隐式使用这来克服类型擦除sqlContext._
不幸的是,您不能在trait中添加带有上下文边界的类型参数,因此最简单的解决方法是使用抽象类
来代替:
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.ClassTag
abstract class Agent[T <: Product : ClassTag : TypeTag] {
protected def load(): Dataset[T] = {
val sqlContext: SQLContext = SparkContextKeeper.sqlContext
import sqlContext.implicits._
sqlContext.read.// same...
}
}
显然,这不等同于使用trait,并且可能表明这种设计不是最适合这项工作的。另一种选择是将load
放在对象中并将type参数移动到方法中:
object Agent {
protected def load[T <: Product : ClassTag : TypeTag](): Dataset[T] = {
// same...
}
}
哪一个更可取主要取决于您将在哪里以及如何调用load
以及您计划对结果做什么。
您需要采取两个行动:
import_
特质代理[T