有人声称,由于重新计算和容错,Spark RDD必须是其输入的确定性函数,但也有批准的非确定性RDD,例如在SparkSQL或SparkML中。关于如何安全地使用非确定性,是否有任何正式的指导?
考虑这个火花工作,用一个菱形的匕首。
val bar = (rdd map f) zip (rdd map g)
bar.saveAsTextFile("outfile")
如果rdd
是不确定的(例如,随机或时间戳),outfile
是否包含一致的数据?是否有可能重新计算zip的一个组件而另一个组件不重新计算?如果我们检查点或坚持rdd
,安全是否保证?一个本地检查站就足够了吗?
全体的
以下是我在实践层面的一些观点和经验:
>
如果您从Hive中的表/文件中读取数据,那么Spark将列出所有使用的文件以及该列表中哪些节点被证明的部分,因此如果重新计算一直返回到开始,即从HDFS/Hive读取该数据子集,则重新计算将是一致的。
如果您使用随机函数,则使用.cache或.sistent来避免使用不同的路径逻辑重新计算。当然,结合前面提到的内容,如果随机函数在读取并必须从源中获取数据后会得到不同的结果。见下文。
从JDBC源读取,如果允许在处理的同时更新该JDBC源并且DAG从中重新计算,则无法保证一致性/确定性结果。
检查点的作用
如果由于任何原因而失败,从DAG返回源代码的计算是昂贵的。在给定阶段执行的检查点将数据存储到磁盘本地或HDFS,如果出现后续故障,则从此时开始重新计算,从而节省时间。DAG血统已断开。
最终注释
如果重新计算从JDBC源或随机函数开始,那么当在Stage中处理时,这些函数可能会影响后续已处理的分区,该怎么办?我无法轻易证明,但我认为那些不适合“当前节点”重新处理的结果会被丢弃。否则就不现实了,这是我的看法。
关于作者自己的回答,火花检查点和持久化到磁盘之间的区别是什么,应该注意以下几点:“……几乎没有什么重要的区别,但最根本的区别是沿袭发生了什么。持久化/缓存保持沿袭完整,而检查点破坏沿袭……”。其他答案中的陈述不正确。
关于< code>rdd上的< code>cache、< code>persist和< code>checkpoint的使用,根据这篇文章< code>persist(StorageLevel .DISK_ONLY)将有效地中断当前作业内的沿袭,而< code>checkpoint将中断跨作业的沿袭(但不清理文件)。我的初步结论是,在< code>persist或< code>checkpoint之后的任务重试不会破坏数据一致性。< code>cache操作不保证一致性。
在持续存在
之前会有问题吗?如果 rdd
分区包含独立的随机数据,则在单独的分区上重试任务没有问题。如果 rdd
包含时间戳,那么 rdd
应该由单个分区组成。
我初步得出结论,使用非确定性RDD进行计算的安全策略是从“独立”分区构建它,这些分区可以安全地单独重新计算,并立即保留到磁盘或检查RDD。如果跨作业重用 RDD,则需要检查点
。
一些转换在RDD的排序中引入了不确定性。如果您需要重复使用的RDD副本之间的顺序一致性(例如,由于zip的索引
),沿袭,回到最近的持续
或检查点
,不应包含任何顺序修改转换。