提问者:小点点

sql. shuffle.分区本地火花性能行为


我最近在本地测试时遇到了一个奇怪的火花性能问题,结果证明这与随机分区的数量有关。我在“火花快速测试”的自述文件中发现了这个俏皮话:

最好将混洗分区的数量设置为一个小数字,例如测试套件中的一个或四个。此配置可以使您的测试运行速度提高70%。如果您在测试套件中使用大数据帧,您可以删除此配置选项或调整它。

但是,我想知道…为什么?

如此之多,以至于我经历了重现这个问题的麻烦(使用这个要点混淆了相当多的业务领域案例类)。

下面将在我的mac本地运行约10秒,使用相当香草的火花创建:

  lazy val spark: SparkSession =
    SparkSession
      .builder()
      .appName("Test-Driver")
      .master("local[2]")
      .config("spark.sql.warehouse.dir", TempWarehouseDir.toString)
      .getOrCreate()

当混洗设置为1时。

但是!如果我将混洗设置更改为集群可能具有的设置,例如200性能下降到近一分钟:

火花. sqlContext.setConf("火花.sql.shuffle.分区","200")

有人知道这是怎么回事吗?为什么增加混洗分区会导致性能在本地下降如此明显?

尽管域类很大,但我认为这并不能完全解释为什么测试会这样。

这是测试代码:

    "list join df takes a long time" in {

      spark.sqlContext.setConf("spark.sql.shuffle.partitions", "200")

      val withList =
      Seq(
        ("key1", Seq(MyBigDomainClass(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None))),
      )
        .toDF("key1", "values").as[(String, List[MyBigDomainClass])]

      val withoutList =
        Seq(
          ("key1", 1),
          ("key2", 2)
        ).toDF("key1", "value").as[(String, Int)]

      var start = System.currentTimeMillis()
      val joined = withoutList.joinWith(withList, withList("key1") === withoutList("key1"), "full")
      joined.show
      println(s"[join] elapsed: ${(System.currentTimeMillis() - start) / 1000}s")

      start = System.currentTimeMillis()
      joined.map {
        case (a, b) => (Option(a), Option(b).fold(List.empty[MyBigDomainClass])(_._2))
      }.show
      println(s"[map] elapsed: ${(System.currentTimeMillis() - start) / 1000}s")
    }

和域类:

package com.edmunds.bde.dataservices.imt.inventory.pipeline.job

case class TestClass_2(field_80:Option[String], field_81:Option[String], field_82:Option[Int])
case class TestClass_3(field_84:Option[Int], field_85:Option[Int], field_86:Option[Int])
case class TestClass_4(field_90:Option[String], field_91:Option[String], field_92:Option[String], field_93:Option[Double], field_94:Option[Double])
case class TestClass_5(field_96:Option[String], field_97:Option[String], field_98:Option[String], field_99:Option[Double], field_100:Option[String], field_101:Option[Int], field_102:Option[String], field_103:Option[String], field_104:Option[String], field_105:Option[Int], field_106:Option[Int], field_107:Option[Int], field_108:Option[Int])
case class TestClass_6(field_111:Option[String], field_112:Option[String], field_113:Option[String], field_114:Option[String], field_115:Option[String], field_116:Option[String], field_117:Option[String], field_118:Option[String], field_119:Option[String])
case class TestClass_7(field_121:Option[String], field_122:Option[String], field_123:Option[String], field_124:Option[String], field_125:Option[String], field_126:Option[String], field_127:Option[String], field_128:Option[String], field_129:Option[String])
case class TestClass_8(field_131:Option[String], field_132:Option[String], field_133:Option[String], field_134:Option[String], field_135:Option[String], field_136:Option[String], field_137:Option[String], field_138:Option[String], field_139:Option[String])
case class TestClass_9(field_141:Option[Long], field_142:Option[String], field_143:Option[String], field_144:Option[String], field_145:Option[Long], field_146:Option[Long])
case class TestClass_10(field_150:Option[Long], field_151:Option[String], field_152:Option[String], field_153:Option[String], field_154:Option[Seq[String]])
case class TestClass_1(field_70:Option[Long], field_71:Option[String], field_72:Option[String], field_73:Option[Long], field_74:Option[String], field_75:Option[String], field_76:Option[String], field_77:Option[String], field_78:Option[String], field_82:Option[TestClass_2], field_86:Option[TestClass_3], field_87:Option[Double], field_88:Option[Double], field_94:Option[Seq[TestClass_4]], field_108:Option[TestClass_5], field_109:Option[String], field_119:Option[TestClass_6], field_129:Option[TestClass_7], field_139:Option[TestClass_8], field_146:Option[Seq[TestClass_9]], field_147:Option[Seq[String]], field_148:Option[Seq[String]], field_154:Option[Seq[TestClass_10]])
case class TestClass_12(field_157:Option[Double], field_158:Option[Double], field_159:Option[Double], field_160:Option[Double], field_161:Option[Double], field_162:Option[java.math.BigDecimal], field_163:Option[java.math.BigDecimal], field_164:Option[Double], field_165:Option[Double])
case class TestClass_11(field_165:Option[TestClass_12], field_166:Option[Long], field_167:Option[scala.collection.Map[String, String]])
case class TestClass_14(field_170:Option[Double], field_171:Option[Double], field_172:Option[String])
case class TestClass_15(field_174:Option[Double], field_175:Option[Double], field_176:Option[Double], field_177:Option[Double], field_178:Option[Double], field_179:Option[Double], field_180:Option[Double], field_181:Option[Double], field_182:Option[Double], field_183:Option[Double], field_184:Option[Double], field_185:Option[Double], field_186:Option[Double], field_187:Option[Int], field_188:Option[Long], field_189:Option[Long], field_190:Option[Long], field_191:Option[Long])
case class TestClass_16(field_193:Option[Double], field_194:Option[Double], field_195:Option[Double], field_196:Option[Double], field_197:Option[Double], field_198:Option[Double])
case class TestClass_17(field_200:Option[java.math.BigDecimal], field_201:Option[Double], field_202:Option[java.math.BigDecimal], field_203:Option[Int])
case class TestClass_19(field_211:Option[Int], field_212:Option[String], field_213:Option[Double], field_214:Option[Int], field_215:Option[Double], field_216:Option[Int], field_217:Option[Double], field_218:Option[Int], field_219:Option[Int], field_220:Option[Int], field_221:Option[Int], field_222:Option[String], field_223:Option[java.sql.Date], field_224:Option[Int], field_225:Option[Int], field_226:Option[Int], field_227:Option[Int], field_228:Option[String])
case class TestClass_18(field_205:Option[Double], field_206:Option[Double], field_207:Option[Double], field_208:Option[Double], field_209:Option[String], field_228:Option[TestClass_19])
case class TestClass_20(field_230:Option[java.sql.Timestamp], field_231:Option[Long], field_232:Option[String], field_233:Option[String], field_234:Option[String], field_235:Option[java.sql.Timestamp], field_236:Option[java.sql.Timestamp], field_237:Option[Double], field_238:Option[Int], field_239:Option[Int], field_240:Option[Boolean], field_241:Option[Int], field_242:Option[Int], field_243:Option[Double], field_244:Option[Long], field_245:Option[String], field_246:Option[java.sql.Timestamp], field_247:Option[String])
case class TestClass_21(field_249:Option[java.sql.Timestamp], field_250:Option[Long], field_251:Option[String], field_252:Option[String], field_253:Option[String], field_254:Option[java.sql.Timestamp], field_255:Option[java.sql.Timestamp], field_256:Option[Double], field_257:Option[Int], field_258:Option[Int], field_259:Option[Boolean], field_260:Option[Int], field_261:Option[Int], field_262:Option[Double], field_263:Option[Long], field_264:Option[String], field_265:Option[java.sql.Timestamp], field_266:Option[String])
case class TestClass_13(field_172:Option[TestClass_14], field_191:Option[TestClass_15], field_198:Option[TestClass_16], field_203:Option[TestClass_17], field_228:Option[TestClass_18], field_247:Option[Seq[TestClass_20]], field_266:Option[Seq[TestClass_21]], field_267:Option[java.math.BigDecimal])
case class TestClass_22(field_269:Option[String], field_270:Option[String], field_271:Option[String], field_272:Option[String], field_273:Option[Double], field_274:Option[String])
case class TestClass_23(field_277:Option[Int], field_278:Option[Boolean], field_279:Option[Int], field_280:Option[Boolean], field_281:Option[Boolean], field_282:Option[Boolean], field_283:Option[Boolean], field_284:Option[Boolean], field_285:Option[Boolean], field_286:Option[String], field_287:Option[String], field_288:Option[String], field_289:Option[Boolean], field_290:Option[Boolean])
case class TestClass_25(field_293:Option[Boolean], field_294:Option[Boolean], field_295:Option[String], field_296:Option[String])
case class TestClass_26(field_298:Option[Boolean], field_299:Option[Boolean], field_300:Option[String], field_301:Option[String])
case class TestClass_27(field_303:Option[Boolean], field_304:Option[Boolean], field_305:Option[String], field_306:Option[String])
case class TestClass_24(field_296:Option[TestClass_25], field_301:Option[TestClass_26], field_306:Option[TestClass_27])
case class TestClass_28(field_311:Option[Long], field_312:Option[Long], field_313:Option[Boolean], field_314:Option[Int], field_315:Option[String], field_316:Option[String], field_317:Option[Boolean], field_318:Option[Boolean], field_319:Option[Boolean])
case class MyBigDomainClass(field_1:Option[String], field_2:Option[String], field_3:Option[String], field_4:Option[String], field_5:Option[java.sql.Timestamp], field_6:Option[java.sql.Date], field_7:Option[String], field_8:Option[String], field_9:Option[String], field_10:Option[String], field_11:Option[Int], field_12:Option[String], field_13:Option[String], field_14:Option[String], field_15:Option[String], field_16:Option[String], field_17:Option[String], field_18:Option[Double], field_19:Option[Double], field_20:Option[Double], field_21:Option[Double], field_22:Option[Double], field_23:Option[Double], field_24:Option[Double], field_25:Option[Double], field_26:Option[Double], field_27:Option[Double], field_28:Option[Double], field_29:Option[Double], field_30:Option[String], field_31:Option[String], field_32:Option[String], field_33:Option[String], field_34:Option[String], field_35:Option[String], field_36:Option[String], field_37:Option[String], field_38:Option[String], field_39:Option[String], field_40:Option[String], field_41:Option[String], field_42:Option[String], field_43:Option[String], field_44:Option[String], field_45:Option[String], field_46:Option[String], field_47:Option[Int], field_48:Option[Int], field_49:Option[java.sql.Date], field_50:Option[java.sql.Date], field_51:Option[java.sql.Date], field_52:Option[java.sql.Date], field_53:Option[String], field_54:Option[String], field_55:Option[Int], field_56:Option[java.sql.Date], field_57:Option[String], field_58:Option[String], field_59:Option[String], field_60:Option[String], field_61:Option[String], field_62:Option[String], field_63:Option[String], field_64:Option[Boolean], field_65:Option[scala.collection.Map[String, String]], field_66:Option[Int], field_67:Option[Int], field_68:Option[String], field_154:Option[TestClass_1], field_167:Option[TestClass_11], field_267:Option[TestClass_13], field_274:Option[Seq[TestClass_22]], field_275:Option[Int], field_290:Option[TestClass_23], field_306:Option[TestClass_24], field_307:Option[Int], field_308:Option[Boolean], field_309:Option[Boolean], field_319:Option[TestClass_28], field_320:Option[java.sql.Timestamp], field_321:Option[java.sql.Date])

共2个答案

匿名用户

我在之前的一个项目中遇到了完全相同的问题。当分区数为200时,每次混洗操作(join、group by等…)都会在您的数据集中创建200个分区。由于您只有2个工作线程,因此这200个分区将在2个工作线程上顺序处理(不会并行处理超过2个分区)。

每个分区都会产生一些开销。假设你总共有1000条记录。拆分到200个分区,每个分区有5条记录。所以总处理成本是1000条记录200个分区开销。由于分区非常小(只有5条记录),分区中数据的流转时长会小于拥有分区所产生的开销。

一般的经验法则是每个内核有大约2个分区。

匿名用户

https://stackoverflow.com/a/72801534/4278032,你所经历的差异(使用4个分区的10秒与使用200个分区的~60秒)似乎比我以前见过的任何东西都要大。

在我的机器上运行您的代码显示的结果更符合我的预期(对于Spark 2.4)。最有趣的是,Spark 3.2/3.3似乎不再需要为测试设置随机分区。

使用200个分区(Spark 3.3.0):

+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|{key1, 1}|{key1, [{null, nu...|
|{key2, 2}|                null|
+---------+--------------------+

[join] elapsed: 2s
+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|{key1, 1}|[{null, null, nul...|
|{key2, 2}|                  []|
+---------+--------------------+

[map] elapsed: 2s

使用4个分区(Spark 3.3.0):

+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|{key1, 1}|{key1, [{null, nu...|
|{key2, 2}|                null|
+---------+--------------------+

[join] elapsed: 2s
+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|{key1, 1}|[{null, null, nul...|
|{key2, 2}|                  []|
+---------+--------------------+

[map] elapsed: 2s

使用200个分区(Spark 2.4.8):

+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|[key1, 1]|[key1, [[,,,,,,,,...|
|[key2, 2]|                null|
+---------+--------------------+

[join] elapsed: 5s
+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|[key1, 1]|[[,,,,,,,,,,,,,,,...|
|[key2, 2]|                  []|
+---------+--------------------+

[map] elapsed: 17s

使用4个分区(Spark 2.4.8):

+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|[key1, 1]|[key1, [[,,,,,,,,...|
|[key2, 2]|                null|
+---------+--------------------+

[join] elapsed: 2s
+---------+--------------------+
|       _1|                  _2|
+---------+--------------------+
|[key1, 1]|[[,,,,,,,,,,,,,,,...|
|[key2, 2]|                  []|
+---------+--------------------+

[map] elapsed: 3s

不过,当在Spark 2.4的情况下将日志级别降低到DEBUG时,执行时间真的会爆炸,因为记录所有这些空分区的纯粹开销(以及两个线程之间的持续同步)。