提问者:小点点

从原始文本到Parquet的火花SQL:没有性能提升


场景如下:

我有一个SparkSQL程序,它对几个Hive表执行ETL过程。这些表是使用RAW TEXT中的Sqoop和Snappy压缩从Teradata数据库导入的(不幸的是Avro格式不适用于Teradata连接器)。SparkSQL过程完成所需的时间约为1小时15分钟。

为了提高性能,我想在执行SparkSQL过程之前,将表转换为更有效的格式,比如Parquet。根据留档和在线讨论,这应该会给使用原始文本带来显著的提升(即使使用snappy压缩,原始文本也不能拆分)。因此,我使用Snappy压缩将所有Hive表转换为Parquet格式。我已经在这些表上启动了相同设置的SparkSQL过程(num-执行器、驱动程序-内存、执行器-内存)。这个过程在1小时20分钟内结束。这对我来说非常令人惊讶。我没想到会像我在一些讨论中读到的那样提高30倍,但我当然期待改进。

在Spark程序中执行的操作类型大部分是连接和过滤器(其中有条件),如以下片段所示:

val sc = new SparkContext(conf)
val sqc = new HiveContext(sc)
sqc.sql("SET hive.exec.compress.output=true")
sqc.sql("SET parquet.compression=SNAPPY")


var vcliff = sqc.read.table(s"$swamp_db.DBU_vcliff")
var vtktdoc = sqc.read.table(s"$swamp_db.DBU_vtktdoc")
var vasccrmtkt = sqc.read.table(s"$swamp_db.DBU_vasccrmtkt")

val numPartitions = 7 * 16

// caching
vcliff.registerTempTable("vcliff")
vtktdoc.registerTempTable("vtktdoc")
vasccrmtkt.registerTempTable("vasccrmtkt")


ar ORI_TktVCRAgency = sqc.sql(
    s"""
       |            SELECT tic.CODCLI,
       |            tic.CODARLPFX,
       |            tic.CODTKTNUM,
       |            tic.DATDOCISS,
       |            vloc.CODTHR,
       |            vloc.NAMCMPNAMTHR,
       |            vloc.CODAGNCTY,
       |            vloc.NAMCIT,
       |            vloc.NAMCOU,
       |            vloc.CODCOU,
       |            vloc.CODTYPTHR,
       |            vloc.CODZIP,
       |            vcom.CODCOMORGLEVDPC,
       |            vcom.DESCOMORGLEVDPC,
       |            vcom.CODCOMORGLEVRMX,
       |            vcom.DESCOMORGLEVRMX,
       |            vcom.CODCOMORGLEVSALUNT,
       |            vcom.CODPSECOMORGCTYLEVSALUNT,
       |            vcom.DESCOMORGLEVSALUNT,
       |            vcom.CODCOMORGLEVRPR,
       |            vcom.CODPSECOMORGCTYLEVRPR,
       |            vcom.DESCOMORGLEVRPR,
       |            vcom.CODCOMORGLEVCTYCNL,
       |            vcom.CODPSECOMORGCTYLEVCTYCNL,
       |            vcom.DESCOMORGLEVCTYCNL,
       |            vcom.CODCOMORGLEVUNT,
       |            vcom.CODPSECOMORGCTYLEVUNT,
       |            vcom.DESCOMORGLEVUNT,
       |            vcli.DESCNL
       |            FROM $swamp_db.DBU_vlocpos vloc
       |                LEFT JOIN $swamp_db.DBU_vcomorghiemktgeo vcom ON vloc.codtypthr = vcom.codtypthr
       |            AND vloc.codthr = vcom.codthr
       |            LEFT JOIN TicketDocCrm tic ON tic.codvdt7 = vloc.codthr
       |            LEFT JOIN vcliff vc ON vc.codcli = tic.codcli
       |            LEFT JOIN $swamp_db.DBU_vclieml vcli ON vc.codcli = vcli.codcli
     """.stripMargin)

ORI_TktVCRAgency.registerTempTable("ORI_TktVCRAgency")

[...]

var TMP_workTemp = sqc.sql(
    s"""
       |SELECT *
       |FROM TicketDocCrm
       |            WHERE CODPNRREF != ''
       |            AND (DESRTGSTS LIKE '%USED%'
       |            OR DESRTGSTS LIKE '%OK%'
       |            OR DESRTGSTS LIKE '%CTRL%'
       |            OR DESRTGSTS LIKE '%RFND%'
       |            OR DESRTGSTS LIKE '%RPRT%'
       |            OR DESRTGSTS LIKE '%LFTD%'
       |            OR DESRTGSTS LIKE '%CKIN%')
     """.stripMargin)

TMP_workTemp.registerTempTable("TMP_workTemp")

var TMP_workTemp1 = sqc.sql(
    s"""
       |SELECT *
       |FROM TMP_workTemp w
       |INNER JOIN
       |    (SELECT CODTKTNUM as CODTKTNUM_T
       |    FROM (
       |        SELECT CODCLI, CODTKTNUM, COUNT(*) as n
       |        FROM TMP_workTemp
       |        GROUP BY CODCLI, CODTKTNUM
       |        HAVING n > 1)
       |    a) b
       |ON w.CODTKTNUM = b.CODTKTNUM_T
     """.stripMargin).drop("CODTKTNUM_T")

[...]

集群由2个master和7个worker组成。每个节点都有:

  • 16核cpu
  • 110 GB内存

火花在YARN。

有人知道为什么在Spark中处理数据之前,从原始文本切换到Parquet格式没有得到任何性能改进吗?


共2个答案

匿名用户

简短的回答。

对于所有类型的查询,parquet都不会优于原始文本数据,这是不正确的。

TLDR;

Parquet是列存储(为了描述什么是列存储,请将表中的每一列存储在单独的文件中,而不是存储行的文件中),这种模式(列存储)提高了分析工作负载(OLAP)的性能。

我可以举一个例子来说明为什么以列式方式存储数据(如parquet)可能会显着提高查询性能。假设您有一个包含300列的表并且您想运行以下查询。

SELECT avg(amount)
FROM my_big_table 

在上面的查询中,您只关心列量的平均值。

如果Spark必须首先在原始文本上执行此操作,它将使用您提供的模式来分割行,然后解析金额列,这需要相当长的计算时间来解析my_big_table中300多列中的金额列。

如果Spark必须从parquet存储中获取平均数量,它必须只读取ument-rain-data的parket块(记住表的每一列都单独存储在parquet中)。Parquet可以通过存储大量元数据和使用列级压缩来进一步提高性能。

你应该读这篇文章。

现在回到你的问题,你的大多数查询都在运行SELECT*,这意味着你正在将所有数据读取到火花中,然后连接或过滤一些值。在第二个查询中,你的查询不会有太大的性能提升,因为你正在读取所有的列,而parque将是一个更昂贵的选择,因为你可能最终会读取更多的文件,而你可能会在原始文本中完成。

过滤是更快的镶木地板在少数情况下,但并不总是,取决于您的数据。

总而言之,您应该根据将要运行的查询类型和您拥有的数据类型选择数据存储。

匿名用户

观察到的几点:

  • hive. exec.comps.输出=true--这将确保Hive查询的最终输出被压缩。但在这种情况下,您使用Spark从Hive读取数据,因此这不会对性能产生任何影响。
  • 检查数据帧的分区,确保有足够的数据帧分区,以便执行器并行处理数据。

要检查分区:

vcliff.rdd.getNumPartitions

>

  • 按最常用的数据帧列对数据帧进行分区,这样Spark在执行连接等聚合时就可以避免混洗。如果最常用的列有更多不同的值,而不是分区,您可以在该列上使用Buckting,这样Spark就可以将数据均匀地分布在分区之间,而不是倾斜成一两个分区。

    vcliff. re分区([numDeftions],"codcli")

    TicketDocCrm. re分区([numDeftions],"DESRTGSTS")