我正在尝试连接两个apache spark sql DataFrame,并将第一个数据帧的列值替换为另一个。如:
Df1:
col1 | col2 | other columns .... say (col-x, col-y, col-z)
------------ |--------------------------------
x | a |random values
y | b |random values
z | c |random values
Df2:
col1 | col3 | other columns .. say (col-a, col-b, col-c)
-------------|--------------------------------
x | a1 |different random values
y | b1 |different random values
w | w1 |different random values
resultant dataframe should be
DF:
col1 | col2 | other columns of DF1 (col-x. col-y, col-z)
-------------|-------------------------------
a1 | a |random values
b1 | b |random values
z | c |random values
我需要执行左连接并将 DF1.col1 的值替换为 DF2.col3,其中 DF1.col1 = DF2.col1。我不知道该怎么做。此外,如上例所示,DF1 除了“col1”和“col2”之外还有更多列,我无法对所有列都应用 select。我正在尝试类似的东西,
val df = df1.join(df2, Seq("col1"), "left").select(
coalesce(df2("col2"), df1("col1")).as("col1")
)
但这似乎行不通。另外,我认为它会过滤掉DF1的其他列。我想保留 DF1 的所有列。
在Scala中如何做到这一点?
您可以按如下方式构造所需的3列。
val df = df1.join(df2, Seq("col1"), "left").select(coalesce(df2("col3"), df1("col1")).as("col1"),col("col2"), col("colx"))
对于连接后从“df1”获取所有列,别名可用于数据帧:
val updatedCol1 = coalesce(df2("col3"), df1("col1")).alias("col1")
val columns = updatedCol1 :: df1.columns
.filterNot(_ == "col1")
.map(cname => col("df1." + cname))
.toList
df1.alias("df1")
.join(df2, Seq("col1"), "left")
.select(columns: _*)