sparksql从原始文本到Parquet:没有性能提升

wd2eg0qa  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(531)

场景如下:
我有一个sparksql程序,它在几个配置单元表上执行etl过程。这些表是在原始文本中使用sqoop和snappy压缩从teradata数据库导入的(不幸的是avro格式不适用于teradata连接器)。spark sql进程完成所需的时间约为1小时15分钟。
为了提高性能,我想在执行sparksql进程之前,将表转换为更有效的格式,比如parquet。根据文档和在线讨论,这应该会大大提高使用原始文本的效率(甚至使用snappy压缩,snappy在原始文本上是不可拆分的)。因此,我转换了所有的Hive表Parquet格式与快速压缩。我使用相同的设置(num executors、driver memory、executor memory)在这些表上启动了sparksql进程。整个过程在1小时20分钟内结束。这让我很惊讶。我没想到会像我在一些讨论中看到的那样提高30倍,但我当然期待着进步。
spark程序中执行的操作类型主要用于连接和过滤器(where conditions),如以下代码段所示:

val sc = new SparkContext(conf)
val sqc = new HiveContext(sc)
sqc.sql("SET hive.exec.compress.output=true")
sqc.sql("SET parquet.compression=SNAPPY")

var vcliff = sqc.read.table(s"$swamp_db.DBU_vcliff")
var vtktdoc = sqc.read.table(s"$swamp_db.DBU_vtktdoc")
var vasccrmtkt = sqc.read.table(s"$swamp_db.DBU_vasccrmtkt")

val numPartitions = 7 * 16

// caching
vcliff.registerTempTable("vcliff")
vtktdoc.registerTempTable("vtktdoc")
vasccrmtkt.registerTempTable("vasccrmtkt")

ar ORI_TktVCRAgency = sqc.sql(
    s"""
       |            SELECT tic.CODCLI,
       |            tic.CODARLPFX,
       |            tic.CODTKTNUM,
       |            tic.DATDOCISS,
       |            vloc.CODTHR,
       |            vloc.NAMCMPNAMTHR,
       |            vloc.CODAGNCTY,
       |            vloc.NAMCIT,
       |            vloc.NAMCOU,
       |            vloc.CODCOU,
       |            vloc.CODTYPTHR,
       |            vloc.CODZIP,
       |            vcom.CODCOMORGLEVDPC,
       |            vcom.DESCOMORGLEVDPC,
       |            vcom.CODCOMORGLEVRMX,
       |            vcom.DESCOMORGLEVRMX,
       |            vcom.CODCOMORGLEVSALUNT,
       |            vcom.CODPSECOMORGCTYLEVSALUNT,
       |            vcom.DESCOMORGLEVSALUNT,
       |            vcom.CODCOMORGLEVRPR,
       |            vcom.CODPSECOMORGCTYLEVRPR,
       |            vcom.DESCOMORGLEVRPR,
       |            vcom.CODCOMORGLEVCTYCNL,
       |            vcom.CODPSECOMORGCTYLEVCTYCNL,
       |            vcom.DESCOMORGLEVCTYCNL,
       |            vcom.CODCOMORGLEVUNT,
       |            vcom.CODPSECOMORGCTYLEVUNT,
       |            vcom.DESCOMORGLEVUNT,
       |            vcli.DESCNL
       |            FROM $swamp_db.DBU_vlocpos vloc
       |                LEFT JOIN $swamp_db.DBU_vcomorghiemktgeo vcom ON vloc.codtypthr = vcom.codtypthr
       |            AND vloc.codthr = vcom.codthr
       |            LEFT JOIN TicketDocCrm tic ON tic.codvdt7 = vloc.codthr
       |            LEFT JOIN vcliff vc ON vc.codcli = tic.codcli
       |            LEFT JOIN $swamp_db.DBU_vclieml vcli ON vc.codcli = vcli.codcli
     """.stripMargin)

ORI_TktVCRAgency.registerTempTable("ORI_TktVCRAgency")

[...]

var TMP_workTemp = sqc.sql(
    s"""
       |SELECT *
       |FROM TicketDocCrm
       |            WHERE CODPNRREF != ''
       |            AND (DESRTGSTS LIKE '%USED%'
       |            OR DESRTGSTS LIKE '%OK%'
       |            OR DESRTGSTS LIKE '%CTRL%'
       |            OR DESRTGSTS LIKE '%RFND%'
       |            OR DESRTGSTS LIKE '%RPRT%'
       |            OR DESRTGSTS LIKE '%LFTD%'
       |            OR DESRTGSTS LIKE '%CKIN%')
     """.stripMargin)

TMP_workTemp.registerTempTable("TMP_workTemp")

var TMP_workTemp1 = sqc.sql(
    s"""
       |SELECT *
       |FROM TMP_workTemp w
       |INNER JOIN
       |    (SELECT CODTKTNUM as CODTKTNUM_T
       |    FROM (
       |        SELECT CODCLI, CODTKTNUM, COUNT(*) as n
       |        FROM TMP_workTemp
       |        GROUP BY CODCLI, CODTKTNUM
       |        HAVING n > 1)
       |    a) b
       |ON w.CODTKTNUM = b.CODTKTNUM_T
     """.stripMargin).drop("CODTKTNUM_T")

[...]

集群由2个主节点和7个工作节点组成。每个节点都有:
16核cpu
110 gb内存
Spark在Yarn上流动。
有人知道为什么在spark中处理数据之前,我没有从原始文本转换到Parquet格式来提高性能吗?

sh7euo9m

sh7euo9m1#

简短的回答。
对于所有类型的查询,parquet的性能都不会超过原始文本数据。
tldr公司;
parquet是一个列式存储(为了描述什么是列式存储,请考虑表中的每一列都存储在一个单独的文件中,而不是存储行的文件),这种模式(列式存储)提高了分析工作负载(olap)的性能。
我可以举一个例子来说明为什么以柱状方式(如Parquet)存储数据可以显著提高查询性能。假设您有一个包含300列的表,并且希望运行以下查询。

SELECT avg(amount)
FROM my_big_table

在上面的查询中,您只关心列amount的平均值。
如果spark必须首先在原始文本上执行此操作,它将使用您提供的模式分割行,然后解析amount列,这需要相当多的计算时间来解析myu big\u表中300多列的amount列。
如果spark必须从一个parquet存储中获取平均数量,它必须只读取parquet块的amount列数据(记住表中的每一列都单独存储在parquet中)。通过存储大量元数据和使用列级压缩,parquet可能会进一步提高性能。
你应该读这篇文章。
现在回到您的问题,大多数查询都运行select*,这意味着您正在将所有数据读入spark,然后连接或过滤一些值。在第二个查询中,由于您正在读取所有列,因此使用parquet表查询的性能不会有太大提高,而parquet在这里是一个成本更高的选择,因为您可能会读取更多可能在原始文本中完成的文件。
过滤是更快的Parquet在少数情况下,但并不总是,取决于您的数据。
总而言之,您应该根据要运行的查询类型和数据类型选择数据存储。

wgeznvg7

wgeznvg72#

观察到两点:
hive.exec.compress.output=true——这将确保配置单元查询的最终输出将被压缩。但是在这种情况下,您正在使用spark从配置单元读取数据,因此这不会对性能产生任何影响。
检查Dataframe的分区,确保有足够的Dataframe分区,以便执行者并行处理数据。
要检查分区:

vcliff.rdd.getNumPartitions

按Dataframe中最常用的列对Dataframe进行分区,这样spark在执行诸如连接之类的聚合时就可以避免混乱。如果最常用的列有更多不同的值,而不是分区,则可以对该列使用bucketing,这样spark就可以将数据均匀地分布在分区上,而不是只倾斜成一个或两个分区。
vcliff.repartition([numpartitions],“codcli”)
ticketdoccrm.repartition([numpartitions],“desrtgsts”)

相关问题