提问者:小点点

连接数据帧和数据集时发生Scala MatchError


我有一个数据帧和一个数据集:

数据帧 1 :

+------------------------------+-----------+
|City_Name                     |Level      |
+------------------------------+------------
|{City -> Paris}               |86         |       
+------------------------------+-----------+

数据集 2 :

+-----------------------------------+-----------+
|Country_Details                    |Temperature|
+-----------------------------------+------------
|{City -> Paris, Country -> France} |31         |       
+-----------------------------------+-----------+

我正在尝试通过检查“City_Name”列中的地图是否包含在“Country_Details”列的地图中来连接它们。

我使用以下UDF来检查情况:

    val mapEqual = udf((col1: Map[String, String], col2: Map[String, String]) => {
        if (col2.nonEmpty){
            col2.toSet subsetOf col1.toSet
        } else {
            true
        }
    })

我是这样连接的:

dataset2.join(dataframe1 , mapEqual(dataset2("Country_Details"), dataframe1("City_Name"), "leftanti")

然而,我得到了这样的错误:

terminated with error scala.MatchError: UDF(Country_Details#528) AS City_Name#552 (of class org.apache.spark.sql.catalyst.expressions.Alias)

以前有人遇到过同样的错误吗?我用的是Spark 3 . 0 . 2版和SQLContext,用的是scala语言。


共1个答案

匿名用户

这里有两个问题,第一个问题是,当您调用函数时,您传递了一个额外的参数<code>leftanti</code>(您本想将其传递给<code>join</code>函数,但却将其传递到udf)。第二个问题是udf逻辑无法按预期工作,我建议您使用以下方法:

val mapContains = udf { (col1: Map[String, String], col2: Map[String, String]) => 
  col2.keys.forall { key => 
    col1.get(key).exists(_ eq col2(key))
  }
}

结果:

scala> ds.join(df1 , mapContains(ds("Country_Details"), df1("City_Name")), "leftanti").show(false)
+----------------------------------+-----------+
|Country_Details                   |Temperature|
+----------------------------------+-----------+
|{City -> Paris, Country -> France}|31         |
+----------------------------------+-----------+