如何减少sparksql执行计划的总计时处理

cnjp1d6j  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(416)

我刚刚开发了一个sparksql应用程序,在一些算法分析过程中,我意识到执行计划需要大量的时间来处理。如何优化spark sql执行计划的性能?
我看了我们社区关于这个问题的几个问题/答案,但对我来说,似乎没有什么是严格到这一点的实施。所以,我想要一些社区支持来克服我的障碍点,也许可以给进一步的开发人员留下一个路线图。
下面是关于所做努力的一些细节。
我开发了一个spark应用程序,它周期性地接收来自kafka的事件,对其进行处理,并将输出再次发送回kafka。简单地说,spark算法过滤/丰富信息,并为每个事件执行繁重而复杂的窗口延迟函数。
spark算法在一个循环中运行,因此每个算法都基于它必须处理的事件数(kafka保留时间) 30m ). 目前,大约需要 ~90s 对于每个执行周期时间,它以批处理模式运行循环,如下所述:
从kafka输入主题获取事件
处理周围 70 SparkSQL
将输出发送回Kafka输出主题
由于每个周期大约需要90秒,这意味着Kafka事件可以从 90s 到180秒进行处理。我必须将此处理时间减少到 60s .
imho,我可以扩展spark硬件,在批处理模式下寻找更好的sql性能,但是由于我发现算法处理的主要部分只是创建一个执行计划,我想知道执行计划可以做些什么来显著减少处理时间。
它当前正在上运行 Spark 3.0.1 在的独立配置中 20 vcore服务器和 32GB 猛撞。下面是一个支持这个问题的代码示例。希望这能解释这种情况。

代码示例

1. 得到 Kafka主题

streamdata = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_servers).\ 
                                                option("subscribe", readtopic).option("failOnDataLoss", "false").\ 
                                                option("startingOffsets", "earliest").\ 
                                                option("endingOffsets", "latest").load() 

# Besides columns identified bellow, we will bring from kafka : offset (auto number) and timestamp (insert datetime)

streamdata = streamdata.withColumn('value', streamdata['value'].cast('string')).drop('key','topic','partition','timestampType') 

streamdata = streamdata.withColumn("LAST_UPDATE", split(col("value"), ",").getItem(0).cast(IntegerType()))\ 
                .withColumn("DESCRIPTION", split(col("value"), ",").getItem(1).cast(StringType())) 
.withColumn("MYTAG", split(col("value"), ",").getItem(6).cast(StringType()))

2. 窗口函数处理(像这样或更复杂的查询)

SUM_COUNT = spark.sql(""" 
    SELECT FIELD_A, FIELD_B, sum(FIELD_C) OVER (PARTITION BY FIELD_D ORDER BY CAST(LAST_UPDATE as timestamp) RANGE BETWEEN INTERVAL 12 HOURS PRECEDING AND CURRENT ROW) as FIELD_COUNT_12h  
    FROM streamdata 
""")

3. 将数据发送回Kafka


# create value as a concat json of all columns

SendKafka = query_03.withColumn("value", to_json(struct([query_03[x] for x in query_03.columns]))) 

# Send back to kafka

SendKafka.write.format("kafka").option("kafka.bootstrap.servers", kafka_servers).option("topic", writetopic).save()
s5a0g9ez

s5a0g9ez1#

如果不研究实际的执行计划,只使用代码,很难说。从调谐开始 spark.sql.shuffle.partitions -将其设置为spark作业可用的核心数-如果您有20个核心,并且您使用的默认值等于200,那么这意味着在第一次洗牌之后,每个核心将执行代码10次(200/20),在前一次之后执行一次。另外,考虑到spark根据主题中的分区数从kafka读取数据,因此如果分区数少于核心数,那么在读取时核心将处于空闲状态-请检查 minPartitions Kafka连接器选件(见文件)
另外,请查看spark sql tuning guide和spark tuning guide中的建议。

相关问题