提问者:小点点

如何将两个火花数据帧与可能不同的结构类型字段联合起来?


我对Apache Spark很陌生,有时还在努力。我正在尝试导入一个相当复杂的JSON文件,并在将其保存到拼花文件之前将其展平。

我的JSON文件是一棵商店树。

{
"id": "store02",
"name": "store name",
"domain": "domain",
"currency": "EUR",
"address1": "Somewhere",
"country": "GER",
"city": "Berlin",
"zipCode": "12345",
"timeZone": "CET",
"accounts" : [
    {
        "field1": "",
        "filed2": "",
        "field3": "",
        "optionnalArray1": [
            {
                "field1": "",
                "field2": ""
            }
        ],
        "optionnalArray2": ["aa", "bb"]
    }
],
"stores": [ .... ]    
}

每个商店可以有一个字段,它是一个帐户数组。一个帐户有3个必填字段和两个可选字段。所以我有一个数据框,其中一个字段可以有3种不同的类型。

在数据帧中导入文件没什么大不了的,但在扁平化过程中,我可能想对两个数据帧进行联合,其中帐户可能具有不同的架构,当然我有以下错误:

"只能对具有兼容列类型的表执行联合"

有没有办法做到这一点很容易?如何能火花导入这样的JSON文件没有问题?

假设我有两个数据帧。第一个是没有帐户的商店的数据帧。第二个是有帐户的商店的数据帧。帐户是这样定义的结构:

val acquirerStruct = StructType(
    StructField("merchantId", StringType, nullable = true) ::
    StructField("name", StringType, nullable = true) ::
    Nil)

val accountStruct = StructType(
    StructField("acquirers", ArrayType(acquirerStruct), nullable = true) ::
        StructField("applicationCode", StringType, nullable = true) ::
        StructField("channelType", StringType, nullable = true) ::
        StructField("id", StringType, nullable = true) ::
        StructField("terminals", ArrayType(StringType), nullable = true) ::
        Nil)

当我想合并两个数据帧时,我会在之前创建一个列帐户到我的第一个数据帧:

df1.withColumn("account", array(lit(null).cast(accountStruct))).union(df2)

如果在df2中,所有行都有一个帐户,该帐户具有相同的结构,那么它可以正常工作。但这并不总是正确的。一个帐户可能没有终端或收单者。这在JSON中完全有效。在这种情况下,我有前面提到的错误。

"只能对具有兼容列类型的表执行联合"


共2个答案

匿名用户

在Spark 3中,您可以使用:

df = df1.unionByName(df2, allowMissingColumns=True)

在Spark 2中,您可以使用:

diff1 = [c for c in df2.columns if c not in df1.columns]
diff2 = [c for c in df1.columns if c not in df2.columns]
df = df1.select('*', *[F.lit(None).alias(c) for c in diff1]) \
    .unionByName(df2.select('*', *[F.lit(None).alias(c) for c in diff2]))

但是我想补充一点,为了让你的生活更轻松,你应该JSON文件在同一个目录中,然后读取它们,Spark会为你做这项工作。

val peopleDF = spark.read.json(path)

这将联合数据并同时为您填充空值。在我看来,这是最简单的方法。

匿名用户

我在PySpark中遇到了同样的问题,我通过在读取不兼容的数据帧时提供模式来解决它

import copy
...
schema_to_read = copy.deepcopy(df1.schema)
df2 = sql_context.read.format("json").schema(schema_to_read).load(path)