提问者:小点点

DataFrame分区通过到单个Parquet文件(每个分区)


我想重新分区/合并我的数据,以便将其保存到每个分区的一个Parquet文件中。我还想使用SparkSQL分区ByAPI。所以我可以这样做:

df.coalesce(1)
    .write
    .partitionBy("entity", "year", "month", "day", "status")
    .mode(SaveMode.Append)
    .parquet(s"$location")

我已经测试过了,它似乎表现不佳。这是因为数据集中只有一个分区可以工作,所有文件的分区、压缩和保存都必须由一个CPU核心完成。

我可以重写它以在调用coalesce之前手动进行分区(例如使用具有不同分区值的过滤器)。

但是有没有更好的方法来使用标准的火花SQLAPI?


共3个答案

匿名用户

我遇到了完全相同的问题,我找到了一种使用DataFrame. re分区()来做到这一点的方法。使用coalesce(1)的问题是您的并行度下降到1,它可能在最好的情况下很慢,在最坏的情况下会出错。增加这个数字也没有帮助——如果您使用coalesce(10),您会获得更多的并行度,但最终每个分区会有10个文件。

要在不使用coalesce()的情况下为每个分区获取一个文件,请使用重新分区()与您希望输出被分区的相同列。所以在你的情况下,这样做:

import spark.implicits._
df
  .repartition($"entity", $"year", $"month", $"day", $"status")
  .write
  .partitionBy("entity", "year", "month", "day", "status")
  .mode(SaveMode.Append)
  .parquet(s"$location")

一旦我这样做了,我每个输出分区都会得到一个镶木地板文件,而不是多个文件。

我在Python测试了这个,但我认为在Scala中应该是一样的。

匿名用户

根据定义:

返回一个新的DataFrame,该DataFrame具有完全相同的numGroutions分区。

您可以使用它来减少RDD/DataFrame中的分区数。它有助于在过滤大型数据集后更有效地运行操作。

关于你的代码,它执行得不好,因为你实际在做的是:

>

  • 将所有数据放入1个分区,这会使驱动程序过载,因为它将所有数据拉入驱动程序的1个分区(这也不是一个好的做法)

    coalesce实际上会打乱网络上的所有数据,这也可能导致性能损失。

    混洗是Spark重新分发数据的机制,以便在分区之间进行不同的分组。这通常涉及跨执行器和机器复制数据,使混洗成为复杂且成本高昂的操作。

    混洗概念对于管理和理解非常重要。最好尽可能少地混洗,因为这是一个昂贵的操作,因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织混洗的数据,Spark生成一组任务——映射任务来组织数据,以及一组减少任务来聚合数据。这个命名来自MapReduce,与Spark的映射和减少操作没有直接关系。

    在内部,单个map任务的结果会保存在内存中,直到它们无法容纳为止。然后,这些结果会根据目标分区进行排序并写入单个文件。在duce方面,任务会读取相关的排序块。

    关于分区组件,我建议您阅读这里关于带有Parquet分区的Spark DataFrames的答案,以及Spark性能调整编程指南中的本节。

    我希望这有帮助!

  • 匿名用户

    它并不是在@mortada的解决方案之上,但这里有一个小抽象,可以确保您使用相同的分区来重新分区和写入,并演示排序:

      def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
        start = datetime.now()
        (df.repartition(*partitions)
          .sortWithinPartitions(*sort_within_partitions)
          .write.partitionBy(*partitions)
          # TODO: Format of your choosing here
          .mode(SaveMode.Append).parquet(path)
          # or, e.g.:
          #.option("compression", "gzip").option("header", "true").mode("overwrite").csv(path)
        )
        print(f"Wrote data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
            f"\n  {path}\n  Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
    

    用法:

    one_file_per_partition(df, location, ["entity", "year", "month", "day", "status"])