场景如下:
我有一个sparksql程序,它在几个配置单元表上执行etl过程。这些表是在原始文本中使用sqoop和snappy压缩从teradata数据库导入的(不幸的是avro格式不适用于teradata连接器)。spark sql进程完成所需的时间约为1小时15分钟。
为了提高性能,我想在执行sparksql进程之前,将表转换为更有效的格式,比如parquet。根据文档和在线讨论,这应该会大大提高使用原始文本的效率(甚至使用snappy压缩,snappy在原始文本上是不可拆分的)。因此,我转换了所有的Hive表Parquet格式与快速压缩。我使用相同的设置(num executors、driver memory、executor memory)在这些表上启动了sparksql进程。整个过程在1小时20分钟内结束。这让我很惊讶。我没想到会像我在一些讨论中看到的那样提高30倍,但我当然期待着进步。
spark程序中执行的操作类型主要用于连接和过滤器(where conditions),如以下代码段所示:
val sc = new SparkContext(conf)
val sqc = new HiveContext(sc)
sqc.sql("SET hive.exec.compress.output=true")
sqc.sql("SET parquet.compression=SNAPPY")
var vcliff = sqc.read.table(s"$swamp_db.DBU_vcliff")
var vtktdoc = sqc.read.table(s"$swamp_db.DBU_vtktdoc")
var vasccrmtkt = sqc.read.table(s"$swamp_db.DBU_vasccrmtkt")
val numPartitions = 7 * 16
// caching
vcliff.registerTempTable("vcliff")
vtktdoc.registerTempTable("vtktdoc")
vasccrmtkt.registerTempTable("vasccrmtkt")
ar ORI_TktVCRAgency = sqc.sql(
s"""
| SELECT tic.CODCLI,
| tic.CODARLPFX,
| tic.CODTKTNUM,
| tic.DATDOCISS,
| vloc.CODTHR,
| vloc.NAMCMPNAMTHR,
| vloc.CODAGNCTY,
| vloc.NAMCIT,
| vloc.NAMCOU,
| vloc.CODCOU,
| vloc.CODTYPTHR,
| vloc.CODZIP,
| vcom.CODCOMORGLEVDPC,
| vcom.DESCOMORGLEVDPC,
| vcom.CODCOMORGLEVRMX,
| vcom.DESCOMORGLEVRMX,
| vcom.CODCOMORGLEVSALUNT,
| vcom.CODPSECOMORGCTYLEVSALUNT,
| vcom.DESCOMORGLEVSALUNT,
| vcom.CODCOMORGLEVRPR,
| vcom.CODPSECOMORGCTYLEVRPR,
| vcom.DESCOMORGLEVRPR,
| vcom.CODCOMORGLEVCTYCNL,
| vcom.CODPSECOMORGCTYLEVCTYCNL,
| vcom.DESCOMORGLEVCTYCNL,
| vcom.CODCOMORGLEVUNT,
| vcom.CODPSECOMORGCTYLEVUNT,
| vcom.DESCOMORGLEVUNT,
| vcli.DESCNL
| FROM $swamp_db.DBU_vlocpos vloc
| LEFT JOIN $swamp_db.DBU_vcomorghiemktgeo vcom ON vloc.codtypthr = vcom.codtypthr
| AND vloc.codthr = vcom.codthr
| LEFT JOIN TicketDocCrm tic ON tic.codvdt7 = vloc.codthr
| LEFT JOIN vcliff vc ON vc.codcli = tic.codcli
| LEFT JOIN $swamp_db.DBU_vclieml vcli ON vc.codcli = vcli.codcli
""".stripMargin)
ORI_TktVCRAgency.registerTempTable("ORI_TktVCRAgency")
[...]
var TMP_workTemp = sqc.sql(
s"""
|SELECT *
|FROM TicketDocCrm
| WHERE CODPNRREF != ''
| AND (DESRTGSTS LIKE '%USED%'
| OR DESRTGSTS LIKE '%OK%'
| OR DESRTGSTS LIKE '%CTRL%'
| OR DESRTGSTS LIKE '%RFND%'
| OR DESRTGSTS LIKE '%RPRT%'
| OR DESRTGSTS LIKE '%LFTD%'
| OR DESRTGSTS LIKE '%CKIN%')
""".stripMargin)
TMP_workTemp.registerTempTable("TMP_workTemp")
var TMP_workTemp1 = sqc.sql(
s"""
|SELECT *
|FROM TMP_workTemp w
|INNER JOIN
| (SELECT CODTKTNUM as CODTKTNUM_T
| FROM (
| SELECT CODCLI, CODTKTNUM, COUNT(*) as n
| FROM TMP_workTemp
| GROUP BY CODCLI, CODTKTNUM
| HAVING n > 1)
| a) b
|ON w.CODTKTNUM = b.CODTKTNUM_T
""".stripMargin).drop("CODTKTNUM_T")
[...]
集群由2个主节点和7个工作节点组成。每个节点都有:
16核cpu
110 gb内存
Spark在Yarn上流动。
有人知道为什么在spark中处理数据之前,我没有从原始文本转换到Parquet格式来提高性能吗?
2条答案
按热度按时间sh7euo9m1#
简短的回答。
对于所有类型的查询,parquet的性能都不会超过原始文本数据。
tldr公司;
parquet是一个列式存储(为了描述什么是列式存储,请考虑表中的每一列都存储在一个单独的文件中,而不是存储行的文件),这种模式(列式存储)提高了分析工作负载(olap)的性能。
我可以举一个例子来说明为什么以柱状方式(如Parquet)存储数据可以显著提高查询性能。假设您有一个包含300列的表,并且希望运行以下查询。
在上面的查询中,您只关心列amount的平均值。
如果spark必须首先在原始文本上执行此操作,它将使用您提供的模式分割行,然后解析amount列,这需要相当多的计算时间来解析myu big\u表中300多列的amount列。
如果spark必须从一个parquet存储中获取平均数量,它必须只读取parquet块的amount列数据(记住表中的每一列都单独存储在parquet中)。通过存储大量元数据和使用列级压缩,parquet可能会进一步提高性能。
你应该读这篇文章。
现在回到您的问题,大多数查询都运行select*,这意味着您正在将所有数据读入spark,然后连接或过滤一些值。在第二个查询中,由于您正在读取所有列,因此使用parquet表查询的性能不会有太大提高,而parquet在这里是一个成本更高的选择,因为您可能会读取更多可能在原始文本中完成的文件。
过滤是更快的Parquet在少数情况下,但并不总是,取决于您的数据。
总而言之,您应该根据要运行的查询类型和数据类型选择数据存储。
wgeznvg72#
观察到两点:
hive.exec.compress.output=true——这将确保配置单元查询的最终输出将被压缩。但是在这种情况下,您正在使用spark从配置单元读取数据,因此这不会对性能产生任何影响。
检查Dataframe的分区,确保有足够的Dataframe分区,以便执行者并行处理数据。
要检查分区:
按Dataframe中最常用的列对Dataframe进行分区,这样spark在执行诸如连接之类的聚合时就可以避免混乱。如果最常用的列有更多不同的值,而不是分区,则可以对该列使用bucketing,这样spark就可以将数据均匀地分布在分区上,而不是只倾斜成一个或两个分区。
vcliff.repartition([numpartitions],“codcli”)
ticketdoccrm.repartition([numpartitions],“desrtgsts”)