提问者:小点点

通过键重新分区大型数据集并为每个组逐个批次单独应用函数的内存有效方法


我有一个很大的火花scala带有"groupName"列的数据集。数据记录沿着不同的分区分布。我想通过"groupName"将记录分组在一起,逐批收集并在整个批处理上应用一个功能。通过“批处理”,我的意思是同一组的预定义数量的记录(让我们称之为maxBatchCount)。通过“逐批”,我的意思是我想有效地使用内存,而不是将所有分区收集到内存中。

更具体地说,批处理功能包括整个批处理的序列化、压缩和加密。这后来被转换为另一个数据集,使用的部分通过("groupName")写入hdfs。因此我无法避免完全洗牌。

有简单的方法吗?我做了一些下面描述的尝试,但TL/DR它似乎有点过于复杂,最终在Java内存问题上失败了。

细节

我尝试使用重新分区("groupName")mapDeftionsIterator分组(maxBatchCount)方法的组合,这似乎非常适合这项任务。然而,重新分区只确保相同的groupName的记录将在同一个分区中,但是单个分区可能有来自几个不同的groupName的记录(如果#组

然后我试图用Iterator分区方法来增强上面的解决方案。这个想法是首先迭代完整的分区,用于构建所有当前组的Set,然后使用Iterator.分区为每个当前组构建一个单独的迭代器。然后像以前一样使用分组

它是这样的-为了说明我使用了两个Ints的简单案例类,groupName实际上是mod3列,通过对Range中的每个数字应用模3函数创建:

  case class Mod3(number: Int, mod3: Int)
  val maxBatchCount = 5
  val df = spark.sparkContext.parallelize(Range(1,21))
     .toDF("number").withColumn("mod3", col("number") % 3)

  // here I choose #partitions < #groups for illustration
  val dff = df.repartition(1, col("mod3"))

  val dsArr = dff.as[Mod3].mapPartitions(partitionIt => {
    // we'll need 2 iterations
    val (it1, it2) = partitionIt.duplicate
    
    // first iterate to create a Set of all present groups
    val mod3set = it1.map(_.mod3).toSet
    
    // build partitioned iterators map (one for each group present)
    var it: Iterator[Mod3] = it2 // init var
    val itMap = mod3set.map(mod3val => {
      val (filteredIt, residueIt) = it.partition(_.mod3 == mod3val)
      val pair = (mod3val -> filteredIt)
      it = residueIt
      pair
    }).toMap

    mod3set.flatMap(mod3val => {
      itMap(mod3val).grouped(maxBatchCount).map(grp => {
        val batch = grp.toList
        batch.map(_.number).toArray[Int] // imagine some other batch function
      })
    }).toIterator
  }).as[Array[Int]]

  val dsArrCollect = dsArr.collect
  dsArrCollect.map(_.toList).foreach(println)

这在小数据测试时似乎工作得很好,但是当使用实际数据运行时(在实际的有20个执行器的火花集群上,每个执行器2个内核),我收到了java. lang.OutOfMemoryError:超过GC开销限制

  • 注意,在我的实际数据组中,大小是高度倾斜的,其中一个组的大小大约是所有其他组的总和(我想GC内存问题与该组有关)。正因为如此,我也试图在重新分区中组合一个辅助中性列,但没有帮助。

将感谢这里的任何指针,谢谢!


共1个答案

匿名用户

我认为您对重新分区映射分区有正确的方法。问题是您的映射分区函数最终会将整个分区加载到内存中。

第一种解决方案可能是增加分区的数量,从而减少分区中的组/数据的数量。

另一种解决方案是使用分区It. flitMap并一次处理1条记录,最多只累积1组数据

  • 使用sortWellinGroutions使来自同一组的记录是连续的
  • 在平面图函数中,积累您的数据并跟踪组变化。