Spark:以编程方式获取集群核心数
问题内容:
我在纱线簇中运行我的spark应用程序。在我的代码中,我使用队列的可用数量核心在数据集中创建分区:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题:如何通过编程方式而不是通过配置获取可用的队列核心数?
问题答案:
有多种方法可以从Spark获取执行程序的数量和集群中的核心数量。这是我过去使用的一些Scala实用程序代码。您应该可以轻松地使其适应Java。有两个关键思想:
-
工人人数是执行者人数减去一或
sc.getExecutorStorageStatus.length - 1
。 -
每个工人的核心数可以通过
java.lang.Runtime.getRuntime.availableProcessors
在一个工人上执行来获得。
其余代码是为SparkContext
使用Scala隐式添加便利方法的样板。我在1.x年前编写了代码,这就是为什么不使用的原因SparkSession
。
最后一点:合并多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。在实践中,我使用1.5倍至4倍之间的任何值,具体取决于数据大小以及作业是否在共享群集上运行。
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
更新资料
最近getExecutorStorageStatus
已被删除。我们已切换为使用SparkEnv
的`blockManager.master.getStorageStatus.length
1(减号再次用于驱动程序)。正常的方式来得到它,通过
env的
SparkContext是没有的外部访问
org.apache.spark`包。因此,我们使用封装违规模式:
package org.apache.spark
object EncapsulationViolator {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}