我有一个很大的火花scala带有"groupName"
列的数据集。数据记录沿着不同的分区分布。我想通过"groupName"
将记录分组在一起,逐批收集并在整个批处理上应用一个功能。通过“批处理”,我的意思是同一组的预定义数量的记录(让我们称之为maxBatchCount
)。通过“逐批”,我的意思是我想有效地使用内存,而不是将所有分区收集到内存中。
更具体地说,批处理功能包括整个批处理的序列化、压缩和加密。这后来被转换为另一个数据集,使用的部分通过("groupName")
写入hdfs。因此我无法避免完全洗牌。
有简单的方法吗?我做了一些下面描述的尝试,但TL/DR它似乎有点过于复杂,最终在Java内存问题上失败了。
细节
我尝试使用重新分区("groupName")
、mapDeftions
和Iterator
的分组(maxBatchCount)
方法的组合,这似乎非常适合这项任务。然而,重新分区只确保相同的groupName
的记录将在同一个分区中,但是单个分区可能有来自几个不同的groupName
的记录(如果#组
然后我试图用Iterator
的分区
方法来增强上面的解决方案。这个想法是首先迭代完整的分区,用于构建所有当前组的Set
,然后使用Iterator.分区
为每个当前组构建一个单独的迭代器。然后像以前一样使用分组
。
它是这样的-为了说明我使用了两个Ints的简单案例类,groupName
实际上是mod3
列,通过对Range中的每个数字
应用模3
函数创建:
case class Mod3(number: Int, mod3: Int)
val maxBatchCount = 5
val df = spark.sparkContext.parallelize(Range(1,21))
.toDF("number").withColumn("mod3", col("number") % 3)
// here I choose #partitions < #groups for illustration
val dff = df.repartition(1, col("mod3"))
val dsArr = dff.as[Mod3].mapPartitions(partitionIt => {
// we'll need 2 iterations
val (it1, it2) = partitionIt.duplicate
// first iterate to create a Set of all present groups
val mod3set = it1.map(_.mod3).toSet
// build partitioned iterators map (one for each group present)
var it: Iterator[Mod3] = it2 // init var
val itMap = mod3set.map(mod3val => {
val (filteredIt, residueIt) = it.partition(_.mod3 == mod3val)
val pair = (mod3val -> filteredIt)
it = residueIt
pair
}).toMap
mod3set.flatMap(mod3val => {
itMap(mod3val).grouped(maxBatchCount).map(grp => {
val batch = grp.toList
batch.map(_.number).toArray[Int] // imagine some other batch function
})
}).toIterator
}).as[Array[Int]]
val dsArrCollect = dsArr.collect
dsArrCollect.map(_.toList).foreach(println)
这在小数据测试时似乎工作得很好,但是当使用实际数据运行时(在实际的有20个执行器的火花集群上,每个执行器2个内核),我收到了java. lang.OutOfMemoryError:超过GC开销限制
重新分区
中组合一个辅助中性列,但没有帮助。将感谢这里的任何指针,谢谢!
我认为您对重新分区映射分区有正确的方法。问题是您的映射分区函数最终会将整个分区加载到内存中。
第一种解决方案可能是增加分区的数量,从而减少分区中的组/数据的数量。
另一种解决方案是使用分区It. flitMap并一次处理1条记录,最多只累积1组数据