提问者:小点点

Pyspark中具有最大值的GroupBy列和筛选器行


我几乎可以肯定以前有人问过这个问题,但通过stackoverflow进行搜索并没有回答我的问题。不是[2]的副本,因为我想要的是最大值,而不是最频繁的项。我是pyspark新手,尝试做一些非常简单的事情:我想按列“A”分组,然后只在列“B”中保留每个组中具有最大值的行。这样地:

df_cleaned = df.groupBy("A").agg(F.max("B"))

不幸的是,这会丢弃所有其他列-df_cleaned只包含列“A”和最大值B。如何保留这些行?(“A”、“B”、“C”…)


共3个答案

匿名用户

您可以使用窗口在不使用udf的情况下执行此操作。

考虑下面的例子:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
    ('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  5|
#|  a|  8|
#|  a|  7|
#|  b|  1|
#|  b|  3|
#+---+---+

创建一个窗口以按列a进行分区,并使用该窗口计算每组的最大值。然后过滤掉行,使列B中的值等于最大值。

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB')\
    .show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  8|
#|  b|  3|
#+---+---+

或者等效地使用pyspark-sql

df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#|  A|  B|
#+---+---+
#|  b|  3|
#|  a|  8|
#+---+---+

匿名用户

另一种可能的方法是应用连接数据帧,它本身指定“左半”。这种联接包括左侧数据框中的所有列,右侧没有列。

例如:

import pyspark.sql.functions as f
data = [
    ('a', 5, 'c'),
    ('a', 8, 'd'),
    ('a', 7, 'e'),
    ('b', 1, 'f'),
    ('b', 3, 'g')
]
df = sqlContext.createDataFrame(data, ["A", "B", "C"])
df.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+

可通过以下操作选择A列的B列最大值:

df.groupBy('A').agg(f.max('B')
+---+---+
|  A|  B|
+---+---+
|  a|  8|
|  b|  3|
+---+---+

使用此表达式作为左半连接中的右侧,并将获得的列max(B)重命名为其原始名称B,我们可以获得所需的结果:

df.join(df.groupBy('A').agg(f.max('B').alias('B')),on='B',how='leftsemi').show()
+---+---+---+
|  B|  A|  C|
+---+---+---+
|  3|  b|  g|
|  8|  a|  d|
+---+---+---+

此解决方案背后的物理计划与公认答案中的物理计划不同,我仍然不清楚哪种解决方案在大型数据帧上的性能更好。

使用火花SQL语法可以获得相同的结果:

df.registerTempTable('table')
q = '''SELECT *
FROM table a LEFT SEMI
JOIN (
    SELECT 
        A,
        max(B) as max_B
    FROM table
    GROUP BY A
    ) t
ON a.A=t.A AND a.B=t.max_B
'''
sqlContext.sql(q).show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  b|  3|  g|
|  a|  8|  d|
+---+---+---+

匿名用户

只想添加scala spark版本的@ndrica的答案,以防有人需要它:

val data = Seq(("a", 5,"c"), ("a",8,"d"),("a",7,"e"),("b",1,"f"),("b",3,"g"))
val df = data.toDF("A","B","C")
df.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+

val rightdf = df.groupBy("A").max("B")
rightdf.show()
+---+------+
|  A|max(B)|
+---+------+
|  b|     3|
|  a|     8|
+---+------+

val resdf = df.join(rightdf, df("B") === rightdf("max(B)"), "leftsemi")
resdf.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  8|  d|
|  b|  3|  g|
+---+---+---+