我刚刚开发了一个sparksql应用程序,在一些算法分析过程中,我意识到执行计划需要大量的时间来处理。如何优化spark sql执行计划的性能?
我看了我们社区关于这个问题的几个问题/答案,但对我来说,似乎没有什么是严格到这一点的实施。所以,我想要一些社区支持来克服我的障碍点,也许可以给进一步的开发人员留下一个路线图。
下面是关于所做努力的一些细节。
我开发了一个spark应用程序,它周期性地接收来自kafka的事件,对其进行处理,并将输出再次发送回kafka。简单地说,spark算法过滤/丰富信息,并为每个事件执行繁重而复杂的窗口延迟函数。
spark算法在一个循环中运行,因此每个算法都基于它必须处理的事件数(kafka保留时间) 30m
). 目前,大约需要 ~90s
对于每个执行周期时间,它以批处理模式运行循环,如下所述:
从kafka输入主题获取事件
处理周围 70
SparkSQL
将输出发送回Kafka输出主题
由于每个周期大约需要90秒,这意味着Kafka事件可以从 90s
到180秒进行处理。我必须将此处理时间减少到 60s
.
imho,我可以扩展spark硬件,在批处理模式下寻找更好的sql性能,但是由于我发现算法处理的主要部分只是创建一个执行计划,我想知道执行计划可以做些什么来显著减少处理时间。
它当前正在上运行 Spark 3.0.1
在的独立配置中 20
vcore服务器和 32GB
猛撞。下面是一个支持这个问题的代码示例。希望这能解释这种情况。
代码示例
1. 得到 Kafka主题
streamdata = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_servers).\
option("subscribe", readtopic).option("failOnDataLoss", "false").\
option("startingOffsets", "earliest").\
option("endingOffsets", "latest").load()
# Besides columns identified bellow, we will bring from kafka : offset (auto number) and timestamp (insert datetime)
streamdata = streamdata.withColumn('value', streamdata['value'].cast('string')).drop('key','topic','partition','timestampType')
streamdata = streamdata.withColumn("LAST_UPDATE", split(col("value"), ",").getItem(0).cast(IntegerType()))\
.withColumn("DESCRIPTION", split(col("value"), ",").getItem(1).cast(StringType()))
.withColumn("MYTAG", split(col("value"), ",").getItem(6).cast(StringType()))
2. 窗口函数处理(像这样或更复杂的查询)
SUM_COUNT = spark.sql("""
SELECT FIELD_A, FIELD_B, sum(FIELD_C) OVER (PARTITION BY FIELD_D ORDER BY CAST(LAST_UPDATE as timestamp) RANGE BETWEEN INTERVAL 12 HOURS PRECEDING AND CURRENT ROW) as FIELD_COUNT_12h
FROM streamdata
""")
3. 将数据发送回Kafka
# create value as a concat json of all columns
SendKafka = query_03.withColumn("value", to_json(struct([query_03[x] for x in query_03.columns])))
# Send back to kafka
SendKafka.write.format("kafka").option("kafka.bootstrap.servers", kafka_servers).option("topic", writetopic).save()
1条答案
按热度按时间s5a0g9ez1#
如果不研究实际的执行计划,只使用代码,很难说。从调谐开始
spark.sql.shuffle.partitions
-将其设置为spark作业可用的核心数-如果您有20个核心,并且您使用的默认值等于200,那么这意味着在第一次洗牌之后,每个核心将执行代码10次(200/20),在前一次之后执行一次。另外,考虑到spark根据主题中的分区数从kafka读取数据,因此如果分区数少于核心数,那么在读取时核心将处于空闲状态-请检查minPartitions
Kafka连接器选件(见文件)另外,请查看spark sql tuning guide和spark tuning guide中的建议。