我正在编写一个本质上是迭代的算法的PySpark实现。该算法的一部分涉及迭代策略,直到不能进行更多的改进(即,已经贪婪地达到局部最大值)。
函数optimize
返回一个三列 Dataframe ,如下所示:
| id|电流值|最佳值|
| - -----|- -----|- -----|
| 0| 1| 1|
| 1| 0| 1|
该函数在while循环中使用,直到current_value
和best_value
相同(这意味着不能再进行优化)。
# Init while loop
iterate = True
# Start iterating until optimization yields same result as before
while iterate:
# Create (or overwrite) `df`
df = optimizeAll(df2) # Uses `df2` as input
df.persist().count()
# Check stopping condition
iterate = df.where('current_value != best_value').count() > 0
# Update `df2` with latest results
if iterate:
df2 = df2.join(other=df, on='id', how='left') # <- Should I persist this?
当我手动将输入传递给它时,这个函数运行得非常快。然而,我注意到函数运行所需的时间随着迭代次数的增加而呈指数级增长。也就是说,第一次迭代以毫秒为单位运行,第二次迭代以秒为单位运行,最终每次运行需要10分钟。
This question建议,如果df
没有缓存,则while循环将在每次迭代时从头开始运行。这是真的吗?
如果是这样,我应该持久化哪些对象?我知道在定义iterate
时,持久化df
将由count
触发。但是,df2
没有动作,所以即使我坚持它,它会不会使while循环每次都从头开始?同样,我是否应该在循环中的某个点取消持久化任一表?
1条答案
按热度按时间afdcj2ne1#
如果您的资源足够,
df
和df2
都应该在您的情况下被持久化。在上面的例子中,由于
df2
不是持久化的,当你每次调用df.persist().count()
时,由于df2
不是持久化的,只剩下沿袭信息,它将在第一次迭代中从第一个df2
开始加入,这显然不是一个有效的方法。即使您只能持久化df
或df2
,您也应该在示例中首先持久化df2
。它应该给予你一个更一致的运行时间。另外,不确定你使用的是哪个Spark版本,如果你使用的是Spark >= 3.0.0,你应该启用
spark.sql.adaptive.enabled
和spark.sql.adaptive.coalescePartitions.enabled
,因为AQE会优化查询计划(https:spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution和https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html)。如果你使用的是Spark < 3.0.0,那么当你想要优化连接和分组策略时,你应该检查日志中的连接策略和数据倾斜情况。