场景如下:
我有一个SparkSQL程序,它对几个Hive表执行ETL过程。这些表是使用RAW TEXT中的Sqoop和Snappy压缩从Teradata数据库导入的(不幸的是Avro格式不适用于Teradata连接器)。SparkSQL过程完成所需的时间约为1小时15分钟。
为了提高性能,我想在执行SparkSQL过程之前,将表转换为更有效的格式,比如Parquet。根据留档和在线讨论,这应该会给使用原始文本带来显著的提升(即使使用snappy压缩,原始文本也不能拆分)。因此,我使用Snappy压缩将所有Hive表转换为Parquet格式。我已经在这些表上启动了相同设置的SparkSQL过程(num-执行器、驱动程序-内存、执行器-内存)。这个过程在1小时20分钟内结束。这对我来说非常令人惊讶。我没想到会像我在一些讨论中读到的那样提高30倍,但我当然期待改进。
在Spark程序中执行的操作类型大部分是连接和过滤器(其中有条件),如以下片段所示:
val sc = new SparkContext(conf)
val sqc = new HiveContext(sc)
sqc.sql("SET hive.exec.compress.output=true")
sqc.sql("SET parquet.compression=SNAPPY")
var vcliff = sqc.read.table(s"$swamp_db.DBU_vcliff")
var vtktdoc = sqc.read.table(s"$swamp_db.DBU_vtktdoc")
var vasccrmtkt = sqc.read.table(s"$swamp_db.DBU_vasccrmtkt")
val numPartitions = 7 * 16
// caching
vcliff.registerTempTable("vcliff")
vtktdoc.registerTempTable("vtktdoc")
vasccrmtkt.registerTempTable("vasccrmtkt")
ar ORI_TktVCRAgency = sqc.sql(
s"""
| SELECT tic.CODCLI,
| tic.CODARLPFX,
| tic.CODTKTNUM,
| tic.DATDOCISS,
| vloc.CODTHR,
| vloc.NAMCMPNAMTHR,
| vloc.CODAGNCTY,
| vloc.NAMCIT,
| vloc.NAMCOU,
| vloc.CODCOU,
| vloc.CODTYPTHR,
| vloc.CODZIP,
| vcom.CODCOMORGLEVDPC,
| vcom.DESCOMORGLEVDPC,
| vcom.CODCOMORGLEVRMX,
| vcom.DESCOMORGLEVRMX,
| vcom.CODCOMORGLEVSALUNT,
| vcom.CODPSECOMORGCTYLEVSALUNT,
| vcom.DESCOMORGLEVSALUNT,
| vcom.CODCOMORGLEVRPR,
| vcom.CODPSECOMORGCTYLEVRPR,
| vcom.DESCOMORGLEVRPR,
| vcom.CODCOMORGLEVCTYCNL,
| vcom.CODPSECOMORGCTYLEVCTYCNL,
| vcom.DESCOMORGLEVCTYCNL,
| vcom.CODCOMORGLEVUNT,
| vcom.CODPSECOMORGCTYLEVUNT,
| vcom.DESCOMORGLEVUNT,
| vcli.DESCNL
| FROM $swamp_db.DBU_vlocpos vloc
| LEFT JOIN $swamp_db.DBU_vcomorghiemktgeo vcom ON vloc.codtypthr = vcom.codtypthr
| AND vloc.codthr = vcom.codthr
| LEFT JOIN TicketDocCrm tic ON tic.codvdt7 = vloc.codthr
| LEFT JOIN vcliff vc ON vc.codcli = tic.codcli
| LEFT JOIN $swamp_db.DBU_vclieml vcli ON vc.codcli = vcli.codcli
""".stripMargin)
ORI_TktVCRAgency.registerTempTable("ORI_TktVCRAgency")
[...]
var TMP_workTemp = sqc.sql(
s"""
|SELECT *
|FROM TicketDocCrm
| WHERE CODPNRREF != ''
| AND (DESRTGSTS LIKE '%USED%'
| OR DESRTGSTS LIKE '%OK%'
| OR DESRTGSTS LIKE '%CTRL%'
| OR DESRTGSTS LIKE '%RFND%'
| OR DESRTGSTS LIKE '%RPRT%'
| OR DESRTGSTS LIKE '%LFTD%'
| OR DESRTGSTS LIKE '%CKIN%')
""".stripMargin)
TMP_workTemp.registerTempTable("TMP_workTemp")
var TMP_workTemp1 = sqc.sql(
s"""
|SELECT *
|FROM TMP_workTemp w
|INNER JOIN
| (SELECT CODTKTNUM as CODTKTNUM_T
| FROM (
| SELECT CODCLI, CODTKTNUM, COUNT(*) as n
| FROM TMP_workTemp
| GROUP BY CODCLI, CODTKTNUM
| HAVING n > 1)
| a) b
|ON w.CODTKTNUM = b.CODTKTNUM_T
""".stripMargin).drop("CODTKTNUM_T")
[...]
集群由2个master和7个worker组成。每个节点都有:
火花在YARN。
有人知道为什么在Spark中处理数据之前,从原始文本切换到Parquet格式没有得到任何性能改进吗?
简短的回答。
对于所有类型的查询,parquet都不会优于原始文本数据,这是不正确的。
TLDR;
Parquet是列存储(为了描述什么是列存储,请将表中的每一列存储在单独的文件中,而不是存储行的文件中),这种模式(列存储)提高了分析工作负载(OLAP)的性能。
我可以举一个例子来说明为什么以列式方式存储数据(如parquet)可能会显着提高查询性能。假设您有一个包含300列的表并且您想运行以下查询。
SELECT avg(amount)
FROM my_big_table
在上面的查询中,您只关心列量的平均值。
如果Spark必须首先在原始文本上执行此操作,它将使用您提供的模式来分割行,然后解析金额列,这需要相当长的计算时间来解析my_big_table中300多列中的金额列。
如果Spark必须从parquet存储中获取平均数量,它必须只读取ument-rain-data的parket块(记住表的每一列都单独存储在parquet中)。Parquet可以通过存储大量元数据和使用列级压缩来进一步提高性能。
你应该读这篇文章。
现在回到你的问题,你的大多数查询都在运行SELECT*,这意味着你正在将所有数据读取到火花中,然后连接或过滤一些值。在第二个查询中,你的查询不会有太大的性能提升,因为你正在读取所有的列,而parque将是一个更昂贵的选择,因为你可能最终会读取更多的文件,而你可能会在原始文本中完成。
过滤是更快的镶木地板在少数情况下,但并不总是,取决于您的数据。
总而言之,您应该根据将要运行的查询类型和您拥有的数据类型选择数据存储。
观察到的几点:
要检查分区:
vcliff.rdd.getNumPartitions
>
按最常用的数据帧列对数据帧进行分区,这样Spark在执行连接等聚合时就可以避免混洗。如果最常用的列有更多不同的值,而不是分区,您可以在该列上使用Buckting,这样Spark就可以将数据均匀地分布在分区之间,而不是倾斜成一两个分区。
vcliff. re分区([numDeftions],"codcli")
TicketDocCrm. re分区([numDeftions],"DESRTGSTS")