我有一个数据帧和一个数据集:
数据帧 1 :
+------------------------------+-----------+
|City_Name |Level |
+------------------------------+------------
|{City -> Paris} |86 |
+------------------------------+-----------+
数据集 2 :
+-----------------------------------+-----------+
|Country_Details |Temperature|
+-----------------------------------+------------
|{City -> Paris, Country -> France} |31 |
+-----------------------------------+-----------+
我正在尝试通过检查“City_Name”列中的地图是否包含在“Country_Details”列的地图中来连接它们。
我使用以下UDF来检查情况:
val mapEqual = udf((col1: Map[String, String], col2: Map[String, String]) => {
if (col2.nonEmpty){
col2.toSet subsetOf col1.toSet
} else {
true
}
})
我是这样连接的:
dataset2.join(dataframe1 , mapEqual(dataset2("Country_Details"), dataframe1("City_Name"), "leftanti")
然而,我得到了这样的错误:
terminated with error scala.MatchError: UDF(Country_Details#528) AS City_Name#552 (of class org.apache.spark.sql.catalyst.expressions.Alias)
以前有人遇到过同样的错误吗?我用的是Spark 3 . 0 . 2版和SQLContext,用的是scala语言。
这里有两个问题,第一个问题是,当您调用函数时,您传递了一个额外的参数<code>leftanti</code>(您本想将其传递给<code>join</code>函数,但却将其传递到udf)。第二个问题是udf逻辑无法按预期工作,我建议您使用以下方法:
val mapContains = udf { (col1: Map[String, String], col2: Map[String, String]) =>
col2.keys.forall { key =>
col1.get(key).exists(_ eq col2(key))
}
}
结果:
scala> ds.join(df1 , mapContains(ds("Country_Details"), df1("City_Name")), "leftanti").show(false)
+----------------------------------+-----------+
|Country_Details |Temperature|
+----------------------------------+-----------+
|{City -> Paris, Country -> France}|31 |
+----------------------------------+-----------+